Sparksql的介绍以及常见操作
时间:2022-03-15 17:55
撰写本文的目的:对于sparksql,网上有大量的详细文档,本人针对常用的操作进行一个整理,当然有大多数都是从其他地方搬过来的,包括官方文档以及其他网友的一些分享,一来是通过此次整理加强自己的记忆,二来如果有幸帮到某位网友,那是本人莫大的荣幸,先感谢您的阅读,废话不多说,进入正文:
下文所涉及到的相关软件版本分别为:
spark版本:v2.2.0
hive : v1.2.1
hadoop : v2.7.6
前言:
Spark sql是spark处理结构化数据的一个模块,它的前身是shark,与基础的spark rdd不同,spark sql提供了结构化数据及计算结果等信息的接口,在内部,spark sql使用这个额外的信息去执行额外的优化,有几种方式可以跟spark sql进行交互,包括sql和dataset api,使用相同的执行引擎进行计算的时候,无论是使用哪一种计算引擎都可以一快速的计算。
Dataset and DataFrames
RDD:在spark刚开始的时候,引入RDD(弹性分布式数据集)
优点:
1)编译时类型安全,编译时就能检查出类型错误
2)面向对象的编程分格,直接通过类名点的方式来操作数据
例如:idAge.filter(_.age > "") //编译时直接报错
idAgeRDDPerson.filter(_.age > 25) //直接操作一个个的person对象
缺点:
1)序列化和反序列化的性能开销,无论是集群间的通信还是IO操作,都需要对对象的结果和数据进行序列化和反序列化
2)GC的性能开销,频繁的创建和销毁对象,势必会增加GC
DataFrame:spark1.3的时候引入了DataFrmae,是一个列方式组织的分布式数据集
优点:
1)引入了Schema,包含了一ROW位单位的每行数据的列信息,spark通过Schema就能够读懂数据,因此在通信和IO时就只需要序列化和反序列化数据,而结构的部分就可以省略了;
2)off-heap:spark能够以二进制的形式序列化数据(不包括结构)到off-heap(堆外内存),当要操作数据时,就直接操作off-heap内存,off-heap类似于地盘,schema类似于地图,Spark有了地图又有了自己地盘了,就可以自己说了算,不再受JVM的限制,也就不再受GC的困扰了,通过Schema和off-heap,DataFrame克服了RDD的缺点。对比RDD提升了计算效率,减少了数据的读取,底层计算优化
3)引入了新的引擎:Tungsten
4)引入了新的语法解析框架:Catalyst
缺点:
DataFrame客服了RDD 的缺点,但是丢失了RDD的优点,DataFrame不是类型安全的,API也不是面向对象分格的。
1)API不是面向对象的
idAgeDF.filter(idAgeDF.col("age") > 22)
2)DataFrame不是编译时类型安全的,下面这种情况下不会报错
idAgeDF.filter(idAgeDF.col("age") > "")
DataSet:到spark1.6的时候引入了DataSet,Encoder分布式数据集,是一个被添加的新接口,它提供了RDD 的优点(强类型化,能够使用强大的lambda函数)
/** * @groupname basic Basic Dataset functions * @groupname action Actions * @groupname untypedrel Untyped transformations * @groupname typedrel Typed transformations * * @since 1.6.0 */ @InterfaceStability.Stable class Dataset[T] private[sql]( @transient val sparkSession: SparkSession, @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution, encoder: Encoder[T]) extends Serializable {
DataSet是一个类,其中包含了三个参数:
DataFrame
1、集合转DataFrame
val ssc = SparkSession().Builder.master("test").appName("test").getOrCreate val seq1 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165)) val df1 = ssc.createDataFrame(seq1).withColumnRenamed("_1", "name1"). withColumnRenamed("_2", "age1").withColumnRenamed("_3", "height1") df1.orderBy(desc("age1")).show(10) import ssc.implicit._ val df2 = ssc.createDataFrame(seq1).toDF("name", "age", "height")
2、RDD转DataFrame
import org.apache.spark.sql.Row import org.apache.spark.sql.types._ val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165)) val rdd1 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3)) val schema = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("height", IntegerType, false) :: Nil)
// false:说明该字段不允许为null true:说明该字段可以为null val rddToDF = spark.createDataFrame(rdd1, schema) rddToDF.orderBy(desc("name")).show(false)
DataSet
1、由range生成DataSet
val numDS = spark.range(5,100,5) numDS.orderBy(desc("id")).show(5) numDS.describe().show
2、由集合生成DS
case class Person(name:String, age:Int, height:Int) val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165)) val spark:SparkSession = SparkSession.Builder.... val ds1 = spark.createDataset(seq1) ds1.show val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165)) val ds2 = spark.createDataset(seq2) ds2.show
3、由RDD进行转换
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165)) val rdd2 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))
val rdd3 = sc.makeRDD(arr).map(f=>Person(r._1,f._2,f._3)) val ds3 = sc.createDataset(rdd2)
val ds4 = rdd3.toDS() ds3.show(10)
通过SparkSession读取文件
import org.apache.spark.sql.types._ val schema2 = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("height", IntegerType, false) :: Nil) val df7 = ssc.read.options(Map(("delimiter", ","), ("header", "false"))).schema(schema2).csv("file:///home/spark/t01.csv") // 读取本地文件 df7.show()
DataSet的基础函数
import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.types._ case class Person(name:String, age:Int, height:Int) spark.sparkContext.setCheckpointDir("hdfs://node1:8020/checkpoint") // 1 DataSet存储类型 val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165)) val ds1 = spark.createDataset(seq1) ds1.show() ds1.checkpoint() ds1.cache() ds1.persist(StorageLevel.MEMORY_ONLY) ds1.count() ds1.show() ds1.unpersist(true) // 2 DataSet结构属性 ds1.columns ds1.dtypes ds1.explain() ds1.col("name") ds1.printSchema // 常用 // 3 DataSet rdd数据互转 val rdd1 = ds1.rdd val ds2 = rdd1.toDS() ds2.show() val df2 = rdd1.toDF() df2.show() // 4 Dataset保存文件 ds1.select("name", "age", "height").write.format("csv").save("data/sql1/my01.csv") // 读取保存的文件 val schema2 = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("height", IntegerType, false) :: Nil) val out = spark.read. options(Map(("delimiter", ","), ("header", "false"))). schema(schema2).csv("data/sql1/*") out.show(10)
DataSet的Action操作
// 1 显示数据集 val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165)) val ds1 = spark.createDataset(seq1) // 缺省显示20行 ds1.show() // 显示2行 ds1.show(2) // 显示20行,不截断字符 ds1.show(20, false) // 2 获取数据集 // collect返回的是数组 val c1 = ds1.collect() // collectAsList返回的是List val c2 = ds1.collectAsList() val h1 = ds1.head() val h2 = ds1.head(3) val f1 = ds1.first() val f2 = ds1.take(2) val t2 = ds1.takeAsList(2) ds.limit(10).show // 取10行数据生成新的DataSet // 3 统计数据集 ds1.count() // 返回全部列的统计(count、mean、stddev、min、max) ds1.describe().show // 返回指定列的统计(count、mean、stddev、min、max) ds1.describe("age").show ds1.describe("age", "height").show // 4 聚集 ds1.reduce{ (f1, f2) => Person("sum", f1.age+f2.age, f1.height+f2.height) }