您的位置:首页 > 博客中心 > 数据库 >

SparkSQL简易入门

时间:2022-03-14 04:48

SparkSQL操作文本文件

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class PageViews(track_time: String, url: String, session_id: String,referer: String, ip: String,end_user_id: String, city_id:String)
val page_views = sc.textFile("hdfs://hadoop000:8020/sparksql/page_views.dat").map(_.split("\t")).map(p => PageViews(p(0), p(1), p(2), p(3), p(4), p(5), p(6)))
page_views.registerTempTable("page_views")
val sql1 = sql("SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10")
sql1.collect()
val sql2 = sql("SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10")
sql2.collect()

 

SparkSQL操作Parquet文件

SparkSQL支持读取Parquet中的数据、支持写到Parquet中时保存元数据的schema信息;列式存储避免读出不需要的数据,提高查询效率,减少GC;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class Person(name: String, age: Int)
val people = sc.textFile("hdfs://hadoop000:8020/sparksql/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.saveAsParquetFile("hdfs://hadoop000:8020/sparksql/resources/people.parquet") //存
val parquetFile = sqlContext.parquetFile("hdfs://hadoop000:8020/sparksql/resources/people.parquet") //读
parquetFile.registerAsTable("parquetFile") 
val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect

 

SparkSQL操作json文件

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val path = "hdfs://hadoop000:8020/sparksql/resources/people.json"
val people = sqlContext.jsonFile(path)
import sqlContext._

people.printSchema()
people.registerTempTable("people")
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
teenagers.collect

val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
anotherPeople.collect

 

SparkSQL操作DSL

使用DSL我们可以直接基于读取的RDD数据进行SQL操作,无需注册成Table,用Scala的symbols代表table中的每一列;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class Person(name: String, age: Int)
val people = sc.textFile("hdfs://hadoop000:8020/sparksql/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
val teenagers = people.where(‘age >= 10).where(‘age <= 19).select(‘name)
teenagers.toDebugString
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

 

SparkSQL操作已有的hive表

spark-shell方式访问

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
sql("SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10").collect().foreach(println)
sql("SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10").collect().foreach(println)

spark-sql方式访问

需要将hive-site.xml拷贝到$SPARK_HOME/conf下

SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10;
SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10;

hive-thriftserver方式访问:

1)启动hive-thriftserver:

cd $SPARK_HOME/sbin
start-thriftserver.sh 

指定端口方式启动:start-thriftserver.sh --hiveconf hive.server2.thrift.port=14000

2)启动beeline客户端:

cd $SPARK_HOME/bin
beeline -u jdbc:hive2://hadoop000:10000/default -n spark
SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10;
SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10;

 

 SparkSQL缓存表

在Spark1.2版本之后注意事项:

1)使用SchemaRDD.cache或者SQLContext.cacheTable,都采用列式存储的方式缓存到内存中;

2)SQLContext.cacheTable/uncacheTable都是eager的,而不再是lazy;不再需要手工触发action后才进行缓存;

3)可以通过CACHE [LAZY] TABLE tb1 [AS SELECT ...] 手工设置LAZY或者EAGER;

 cacheTable后注意观察WEBUI界面Stroage的变化

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
sql("cache table page_views")
sql("select session_id, count(session_id) as c from page_views  group by session_id order by c desc limit 10").collect().foreach(println)
sql("uncache table page_views")

 

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
sql("CACHE TABLE page_views_cached_eager AS SELECT * FROM page_views") 
sql("select session_id, count(session_id) as c from page_views_cached_eager  group by session_id order by c desc limit 10").collect().foreach(println)
uncacheTable("page_views_cached") 
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
sql("CACHE LAZY TABLE page_views_cached_lazy AS SELECT * FROM page_views") 
sql("select count(*) as c from page_views_cached_lazy").collect().foreach(println)
sql("select session_id, count(session_id) as c from page_views_cached_lazy  group by session_id order by c desc limit 10").collect().foreach(println)
uncacheTable("page_views_cached") 

 

热门排行

今日推荐

热门手游