您的位置:首页 > 博客中心 > 数据库 >

sparkSQL1.1入门之二:sparkSQL运行架构

时间:2022-03-14 00:46

      在介绍sparkSQL之前,我们首先来看看,传统的关系型数据库是怎么运行的。当我们提交了一个很简单的查询:

SELECT  a1,a2,a3  FROM  tableA  Where  condition 
gxlsystem.com,布布扣
可以看得出来,该语句是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)组成,分别对应sql查询过程中的Result、Data Source、Operation,也就是说SQL语句按Result-->Data Source-->Operation的次序来描述的。那么,SQL语句在实际的运行过程中是怎么处理的呢?一般的数据库系统先将读入的SQL语句(Query)先进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是Data Source等等。这一步就可以判断SQL语句是否规范,不规范就报错,规范就继续下一步过程绑定(Bind),这个过程将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定,如果相关的Projection、Data Source等等都是存在的话,就表示这个SQL语句是可以执行的;而在执行前,一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize),最终执行该计划(Execute),并返回结果。当然在实际的执行过程中,是按Operation-->Data Source-->Result的次序来进行的,和SQL语句的次序刚好相反;在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。       以上过程看上去非常简单,但实际上会包含很多复杂的操作细节在里面。而这些操作细节都和Tree有关,在数据库解析(Parse)SQL语句的时候,会将SQL语句转换成一个树型结构来进行处理,如下面一个查询,会形成一个含有多个节点(TreeNode)的Tree,然后在后续的处理过程中对该Tree进行一系列的操作。 gxlsystem.com,布布扣
下图给出了对Tree的一些可能的操作细节,对于Tree的处理过程中所涉及更多的细节,可以查看相关的数据库论文。 gxlsystem.com,布布扣

 OK,上面简单介绍了关系型数据库的运行过程,那么,sparkSQL是不是也采用类似的方式处理呢?答案是肯定的。下面我们先来看看sparkSQL中的两个重要概念Tree和Rule、然后再介绍一下sparkSQL的两个分支sqlContext和hiveContext、最后再综合看看sparkSQL的优化器Catalyst。
1:Tree和Rule       sparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。 A:Tree



    B:Rule

            拿个简单的例子,在处理由解析器(SqlParse)生成的LogicPlan Tree的时候,在Analyzer中就定义了多种Rules应用到LogicPlan Tree上。       应用示意图:
      gxlsystem.com,布布扣
            Analyzer中使用的Rules,定义了batches,由多个batch构成,如MultiInstanceRelations、Resolution、Check Analysis、AnalysisOperators等构成;每个batch又有不同的rule构成,如Resolution由ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances等构成;每个rule又有自己相对应的处理函数,可以具体参看Analyzer中的ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances函数;同时要注意的是,不同的rule应用次数是不同的:如CaseInsensitiveAttributeReferences这个batch中rule只应用了一次(Once),而Resolution这个batch中的rule应用了多次(fixedPoint = FixedPoint(100),也就是说最多应用100次,除非前后迭代结果一致退出)。 gxlsystem.com,布布扣
      在整个sql语句的处理过程中,Tree和Rule相互配合,完成了解析、绑定(在sparkSQL中称为Analysis)、优化、物理计划等过程,最终生成可以执行的物理计划。 知道了sparkSQL的各个过程的基本处理方式,下面来看看sparkSQL的运行过程。sparkSQL有两个分支,sqlContext和hivecontext,sqlContext现在只支持sql语法解析器(SQL-92语法);hiveContext现在支持sql语法解析器和hivesql语法解析器,默认为hivesql语法解析器,用户可以通过配置切换成sql语法解析器,来运行hiveql不支持的语法,如select 1。关于sqlContext和hiveContext的具体应用请参看第六部分。
      2:sqlContext的运行过程       sqlContext是使用sqlContext.sql(sqlText)来提交用户sql语句:
      /**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
        def sql(sqlText: String): SchemaRDD = {
          if (dialect == "sql") {
            new SchemaRDD(this, parseSql(sqlText))   //parseSql(sqlText)对sql语句进行语法解析
          } else {
            sys.error(s"Unsupported SQL dialect: $dialect")
          }
        }
      sqlContext.sql的返回结果是SchemaRDD,调用了new SchemaRDD(this, parseSql(sqlText)) 来对sql语句进行处理,处理之前先使用catalyst.SqlParser对sql语句进行语法解析,使之生成UnresolvedLogicalPlan。
      /**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
        protected[sql] val parser = new catalyst.SqlParser
        protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)
      类SchemaRDD继承自SchemaRDDLike gxlsystem.com,布布扣
      /**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala  */
      class SchemaRDD(
          @transient val sqlContext: SQLContext,
          @transient val baseLogicalPlan: LogicalPlan)
        extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike
      SchemaRDDLike中调用sqlContext.executePlan(baseLogicalPlan)来执行catalyst.SqlParser解析后生成UnresolvedLogicalPlan,这里的baseLogicalPlan就是指UnresolvedLogicalPlan。
      /**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala  */
      private[sql] trait SchemaRDDLike {
        @transient val sqlContext: SQLContext
        @transient val baseLogicalPlan: LogicalPlan
        private[sql] def baseSchemaRDD: SchemaRDD
      
        lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
      sqlContext.executePlan做了什么呢?它调用了QueryExecution类
      /**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
      protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
          new this.QueryExecution { val logical = plan }
      QueryExecution类的定义:
      /**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
      protected abstract class QueryExecution {
          def logical: LogicalPlan
         
          //对Unresolved LogicalPlan进行analyzer,生成resolved LogicalPlan
          lazy val analyzed = ExtractPythonUdfs(analyzer(logical))    
          //对resolved LogicalPlan进行optimizer,生成optimized LogicalPlan
          lazy val optimizedPlan = optimizer(analyzed)  
          // 将optimized LogicalPlan转换成PhysicalPlan
          lazy val sparkPlan = {
            SparkPlan.currentContext.set(self)
            planner(optimizedPlan).next()
          }
          // PhysicalPlan执行前的准备工作,生成可执行的物理计划
          lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
      
          //执行可执行物理计划
          lazy val toRdd: RDD[Row] = executedPlan.execute()
      
          ......
        }
      sqlContext总的一个过程如下图所示:



        3:hiveContext的运行过程       在分布式系统中,由于历史原因,很多数据已经定义了hive的元数据,通过这些hive元数据,sparkSQL使用hiveContext很容易实现对这些数据的访问。值得注意的是hiveContext继承自sqlContext,所以在hiveContext的的运行过程中除了override的函数和变量,可以使用和sqlContext一样的函数和变量。       从sparkSQL1.1开始,hiveContext使用hiveContext.sql(sqlText)来提交用户sql语句进行查询:
        /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
        override def sql(sqlText: String): SchemaRDD = {
            // 使用spark.sql.dialect定义采用的语法解析器
            if (dialect == "sql") {
              super.sql(sqlText)    //如果使用sql解析器,则使用sqlContext的sql方法
            } else if (dialect == "hiveql") {     //如果使用和hiveql解析器,则使用HiveQl.parseSql
              new SchemaRDD(this, HiveQl.parseSql(sqlText))
            }  else {
              sys.error(s"Unsupported SQL dialect: $dialect.  Try 'sql' or 'hiveql'")
            }
          }
        hiveContext.sql首先根据用户的语法设置(spark.sql.dialect)决定具体的执行过程,如果dialect == "sql"则采用sqlContext的sql语法执行过程;如果是dialect == "hiveql",则采用hiveql语法执行过程。在这里我们主要看看hiveql语法执行过程。可以看出,hiveContext.sql调用了new SchemaRDD(this, HiveQl.parseSql(sqlText))对hiveql语句进行处理,处理之前先使用对语句进行语法解析。
        /**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala  */
          /** Returns a LogicalPlan for a given HiveQL string. */
          def parseSql(sql: String): LogicalPlan = {
            try {
              if (条件)   {
              //非hive命令的处理,如set、cache table、add jar等直接转化成command类型的LogicalPlan
              .....  
              } else {
                val tree = getAst(sql)
                if (nativeCommands contains tree.getText) {
                  NativeCommand(sql)
                } else {
                  nodeToPlan(tree) match {
                    case NativePlaceholder => NativeCommand(sql)
                    case other => other
                  }
                }
              }
            } catch {
              //异常处理
              ......
            }
          }
        因为sparkSQL所支持的hiveql除了兼容hive语句外,还兼容一些sparkSQL本身的语句,所以在HiveQl.parseSql对hiveql语句语法解析的时候:
          /**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */ /** * Returns the AST for the given SQL string. */ def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))和sqlContext一样,类SchemaRDD继承自SchemaRDDLike ,SchemaRDDLike调用sqlContext.executePlan(baseLogicalPlan),不过hiveContext重写了executePlan()函数,
          /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */ 
          override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
              new this.QueryExecution { val logical = plan }
          并使用了一个继承自sqlContext.QueryExecution的新的QueryExecution类:
          /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
           protected[sql] abstract class QueryExecution extends super.QueryExecution {
              // TODO: Create mixin for the analyzer instead of overriding things here.
              override lazy val optimizedPlan =
                optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))
          
              override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
              ......
            }
          所以在hiveContext的运行过程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。 hiveContext的catalog,是指向 Hive Metastore:
          /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
          /* A catalyst metadata catalog that points to the Hive Metastore. */
            @transient
            override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
              override def lookupRelation(
                databaseName: Option[String],
                tableName: String,
                alias: Option[String] = None): LogicalPlan = {
          
                LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
              }
            }
          hiveContext的analyzer,使用了新的catalog和functionRegistry:
          /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
            /* An analyzer that uses the Hive metastore. */
            @transient
            override protected[sql] lazy val analyzer =
              new Analyzer(catalog, functionRegistry, caseSensitive = false)
          hiveContext的planner,使用新定义的hivePlanner:
          /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
            @transient
            override protected[sql] val planner = hivePlanner
          所以hiveContext总的一个过程如下图所示:

              4:catalyst优化器       sparkSQL1.1总体上由四个模块组成:core、catalyst、hive、hive-Thriftserver:

               其中虚线部分是以后版本要实现的功能,实线部分是已经实现的功能。从上图看,catalyst主要的实现组件有:
              • sqlParse,完成sql语句的语法解析功能,目前只提供了一个简单的sql解析器;
              • Analyzer,主要完成绑定工作,将不同来源的UnresolvedLogicalPlan 和数据元数据(如hive metastore、Schema catalog)进行绑定,生成resolvedLogicalPlan;
              • optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan;
              • Planner将LogicalPlan转换成PhysicalPlan;
              • CostModel,主要根据过去的性能统计数据,选择最佳的物理执行计划
              这些组件的基本实现方法:
              • 先将sql语句通过解析生成Tree,然后在不同阶段使用不同的Rule应用到Tree上,通过转换完成各个组件的功能。
              • Analyzer使用Analysis Rules,配合数据元数据(如hive metastore、Schema catalog),完善UnresolvedLogicalPlan的属性而转换成resolvedLogicalPlan;
              • optimizer使用Optimization Rules,对resolvedLogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成optimizedLogicalPlan;
              • Planner使用Planning Strategies,对optimizedLogicalPlan
              关于本篇中涉及到的相关概念和组件在下篇再详细介绍。

热门排行

今日推荐

热门手游