当前位置:Gxlcms > 数据库问题 > 第五篇:Spark SQL Catalyst源码分析之Optimizer

第五篇:Spark SQL Catalyst源码分析之Optimizer

时间:2021-07-01 10:21:17 帮助过:14人阅读

 
  1. object Optimizer extends RuleExecutor[LogicalPlan] {  
  2.   val batches =  
  3.     Batch("Combine Limits", FixedPoint(100),  
  4.       CombineLimits) ::  
  5.     Batch("ConstantFolding", FixedPoint(100),  
  6.       NullPropagation,  
  7.       ConstantFolding,  
  8.       BooleanSimplification,  
  9.       SimplifyFilters,  
  10.       SimplifyCasts,  
  11.       SimplifyCaseConversionExpressions) ::  
  12.     Batch("Filter Pushdown", FixedPoint(100),  
  13.       CombineFilters,  
  14.       PushPredicateThroughProject,  
  15.       PushPredicateThroughJoin,  
  16.       ColumnPruning) :: Nil  
  17. }  

  另外提一点,Optimizer里不但对Logical Plan进行了优化,而且对Logical Plan中的Expression也进行了优化,所以有必要了解一下Expression相关类,主要是用到了references和outputSet,references主要是Logical Plan或Expression节点的所依赖的那些Expressions,而outputSet是Logical Plan所有的Attribute的输出:

  如:Aggregate是一个Logical Plan, 它的references就是group by的表达式 和 aggreagate的表达式的并集去重。

[java] view plain copy  
  1. case class Aggregate(  
  2.     groupingExpressions: Seq[Expression],  
  3.     aggregateExpressions: Seq[NamedExpression],  
  4.     child: LogicalPlan)  
  5.   extends UnaryNode {  
  6.   
  7.   override def output = aggregateExpressions.map(_.toAttribute)  
  8.   override def references =  
  9.     (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet  
  10. }  

 

  技术分享

二、优化策略详解

  Optimizer的优化策略不仅有对plan进行transform的,也有对expression进行transform的,究其原理就是遍历树,然后应用优化的Rule,但是注意一点,对Logical Plantransfrom的是先序遍历(pre-order),而对Expression transfrom的时候是后序遍历(post-order):

2.1、Batch: Combine Limits

如果出现了2个Limit,则将2个Limit合并为一个,这个要求一个Limit是另一个Limit的grandChild。

 

[java] view plain copy  
  1.  /** 
  2.  * Combines two adjacent [[Limit]] operators into one, merging the 
  3.  * expressions into one single expression. 
  4.  */  
  5. object CombineLimits extends Rule[LogicalPlan] {  
  6.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  7.     case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //ll为当前Limit,le为其expression, nl是ll的grandChild,ne是nl的expression  
  8.       Limit(If(LessThan(ne, le), ne, le), grandChild) //expression比较,如果ne比le小则表达式为ne,否则为le  
  9.   }  
  10. }  
给定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ") 
[java] view plain copy  
  1. scala> query.queryExecution.analyzed  
  2. res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Limit 10  
  4.  Project [key#13,value#14]  
  5.   Limit 100  
  6.    Project [key#13,value#14]  
  7.     MetastoreRelation default, temp_shengli, None  

 

子查询里limit100,外层查询limit10,这里我们当然可以在子查询里不必查那么多,因为外层只需要10个,所以这里会合并Limit10,和Limit100 为 Limit 10。

2.2、Batch: ConstantFolding

  这个Batch里包含了Rules:NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions。

2.2.1、Rule:NullPropagation

  这里先提一下Literal字面量,它其实是一个能匹配任意基本类型的类。(为下文做铺垫)

 

[java] view plain copy  
  1. object Literal {  
  2.   def apply(v: Any): Literal = v match {  
  3.     case i: Int => Literal(i, IntegerType)  
  4.     case l: Long => Literal(l, LongType)  
  5.     case d: Double => Literal(d, DoubleType)  
  6.     case f: Float => Literal(f, FloatType)  
  7.     case b: Byte => Literal(b, ByteType)  
  8.     case s: Short => Literal(s, ShortType)  
  9.     case s: String => Literal(s, StringType)  
  10.     case b: Boolean => Literal(b, BooleanType)  
  11.     case d: BigDecimal => Literal(d, DecimalType)  
  12.     case t: Timestamp => Literal(t, TimestampType)  
  13.     case a: Array[Byte] => Literal(a, BinaryType)  
  14.     case null => Literal(null, NullType)  
  15.   }  
  16. }  
  注意Literal是一个LeafExpression,核心方法是eval,给定Row,计算表达式返回值:

 

 

[java] view plain copy  
  1. case class Literal(value: Any, dataType: DataType) extends LeafExpression {  
  2.   override def foldable = true  
  3.   def nullable = value == null  
  4.   def references = Set.empty  
  5.   override def toString = if (value != null) value.toString else "null"  
  6.   type EvaluatedType = Any  
  7.   override def eval(input: Row):Any = value  
  8. }  
  现在来看一下NullPropagation都做了什么。

 

  NullPropagation是一个能将Expression Expressions替换为等价的Literal值的优化,并且能够避免NULL值在SQL语法树的传播。

 

[java] view plain copy  
  1. /** 
  2.  * Replaces [[Expression Expressions]] that can be statically evaluated with 
  3.  * equivalent [[Literal]] values. This rule is more specific with 
  4.  * Null value propagation from bottom to top of the expression tree. 
  5.  */  
  6. object NullPropagation extends Rule[LogicalPlan] {  
  7.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  8.     case q: LogicalPlan => q transformExpressionsUp {  
  9.       case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType) //如果count(null)则转化为count(0)  
  10.       case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)<span style="font-family: Arial;">//如果sum(null)则转化为sum(0)</span>  
  11.       case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)  
  12.       case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)  
  13.       case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)  
  14.       case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)  
  15.       case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)  
  16.       case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)  
  17.       case e @ Coalesce(children) => {  
  18.         val newChildren = children.filter(c => c match {  
  19.           case Literal(null, _) => false  
  20.           case _ => true  
  21.         })  
  22.         if (newChildren.length == 0) {  
  23.           Literal(null, e.dataType)  
  24.         } else if (newChildren.length == 1) {  
  25.           newChildren(0)  
  26.         } else {  
  27.           Coalesce(newChildren)  
  28.         }  
  29.       }  
  30.       case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValue  
  31.       case e @ In(Literal(v, _), list) if (list.exists(c => c match {  
  32.           case Literal(candidate, _) if candidate == v => true  
  33.           case _ => false  
  34.         })) => Literal(true, BooleanType)  
  35.       // Put exceptional cases above if any  
  36.       case e: BinaryArithmetic => e.children match {  
  37.         case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)  
  38.         case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)  
  39.         case _ => e  
  40.       }  
  41.       case e: BinaryComparison => e.children match {  
  42.         case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)  
  43.         case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)  
  44.         case _ => e  
  45.       }  
  46.       case e: StringRegexExpression => e.children match {  
  47.         case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)  
  48.         case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)  
  49.         case _ => e  
  50.       }  
  51.     }  
  52.   }  
  53. }  
给定SQL: val query = sql("select count(null) from temp_shengli where key is not null")

 

 

[java] view plain copy  
  1. scala> query.queryExecution.analyzed  
  2. res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Aggregate [], [COUNT(null) AS c0#5L] //这里count的是null  
  4.  Filter IS NOT NULL key#7  
  5.   MetastoreRelation default, temp_shengli, None  
调用NullPropagation

 

 

[java] view plain copy  
  1. scala> NullPropagation(query.queryExecution.analyzed)  
  2. res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Aggregate [], [CAST(0, LongType) AS c0#5L]  //优化后为0了  
  4.  Filter IS NOT NULL key#7  
  5.   MetastoreRelation default, temp_shengli, None  

 

2.2.2、Rule:ConstantFolding 

  常量合并是属于Expression优化的一种,对于可以直接计算的常量,不用放到物理执行里去生成对象来计算了,直接可以在计划里就计算出来: [java] view plain copy  
  1. object ConstantFolding extends Rule[LogicalPlan] {  
  2.      def apply(plan: LogicalPlan): LogicalPlan = plan transform { //先对plan进行transform  
  3.        case q: LogicalPlan => q transformExpressionsDown { //对每个plan的expression进行transform  
  4.          // Skip redundant folding of literals.  
  5.          case l: Literal => l  
  6.          case e if e.foldable => Literal(e.eval(null), e.dataType) //调用eval方法计算结果  
  7.        }  
  8.      }  
  9.    }  
给定SQL: val query = sql("select 1+2+3+4 from temp_shengli")
[java] view plain copy  
  1. scala> query.queryExecution.analyzed  
  2. res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [(((1 + 2) + 3) + 4) AS c0#21]  //这里还是常量表达式  
  4.  MetastoreRelation default, src, None  
优化后:
[java] view plain copy  
  1. scala> query.queryExecution.optimizedPlan  
  2. res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [10 AS c0#21] //优化后,直接合并为10  
  4.  MetastoreRelation default, src, None  

2.2.3、BooleanSimplification

 

 这个是对布尔表达式的优化,有点像java布尔表达式中的短路判断,不过这个写的倒是很优雅。

 看看布尔表达式2边能不能通过只计算1边,而省去计算另一边而提高效率,称为简化布尔表达式。

 解释请看我写的注释:

 

[java] view plain copy  
  1. /** 
  2.  * Simplifies boolean expressions where the answer can be determined without evaluating both sides. 
  3.  * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus 
  4.  * is only safe when evaluations of expressions does not result in side effects. 
  5.  */  
  6. object BooleanSimplification extends Rule[LogicalPlan] {  
  7.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  8.     case q: LogicalPlan => q transformExpressionsUp {  
  9.       case and @ And(left, right) => //如果布尔表达式是AND操作,即exp1 and exp2  
  10.         (left, right) match { //(左边表达式,右边表达式)  
  11.           case (Literal(true, BooleanType), r) => r // 左边true,返回右边的<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">值</span>  
  12.           case (l, Literal(true, BooleanType)) => l //右边true,返回左边的bool值  
  13.           case (Literal(false, BooleanType), _) => Literal(false)//左边都false,右边随便,反正是返回false  
  14.           case (_, Literal(false, BooleanType)) => Literal(false)//只要有1边是false了,都是false  
  15.           case (_, _) => and  
  16.         }  
  17.   
  18.       case or @ Or(left, right) =>  
  19.         (left, right) match {  
  20.           case (Literal(true, BooleanType), _) => Literal(true) //只要左边是true了,不用判断右边都是true  
  21.           case (_, Literal(true, BooleanType)) => Literal(true) //只要有一边是true,都返回true  
  22.           case (Literal(false, BooleanType), r) => r //希望右边r是true  
  23.           case (l, Literal(false, BooleanType)) => l  
  24.           case (_, _) => or  
  25.         }  
  26.     }  
  27.   }  
  28. }  

 

2.3 Batch: Filter Pushdown

Filter Pushdown下包含了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruning Ps:感觉Filter Pushdown的名字起的有点不能涵盖全部比如ColumnPruning列裁剪。

 

2.3.1、Combine Filters

 合并两个相邻的Filter,这个和上述Combine Limit差不多。合并2个节点,就可以减少树的深度从而减少重复执行过滤的代价。

 

[java] view plain copy  
  1. /** 
  2.  * Combines two adjacent [[Filter]] operators into one, merging the 
  3.  * conditions into one conjunctive predicate. 
  4.  */  
  5. object CombineFilters extends Rule[LogicalPlan] {  
  6.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  7.     case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)  
  8.   }  
  9. }  
给定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ") 

 

优化前:我们看到一个filter 是另一个filter的grandChild

 

[java] view plain copy  
  1. scala> query.queryExecution.analyzed  
  2. res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [key#27]  
  4.  Filter (key#27 > 80) //filter>80  
  5.   Project [key#27]  
  6.    Filter (key#27 > 100) //filter>100  
  7.     MetastoreRelation default, src, None  
优化后:其实filter也可以表达为一个复杂的boolean表达式
[java] view plain copy  
  1. scala> query.queryExecution.optimizedPlan  
  2. res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [key#27]  
  4.  Filter ((key#27 > 100) && (key#27 > 80)) //合并为1个  
  5.   MetastoreRelation default, src, None  

 

2.3.2  Filter Pushdown 

 

  Filter Pushdown,过滤器下推。

  原理就是更早的过滤掉不需要的元素来减少开销。

  给定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100")

  生成的逻辑计划为:

 

[java] view plain copy  
  1. scala> scala> query.queryExecution.analyzed  
  2. res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [key#31]  
  4.  Filter (key#31 > 100) //先select key, value,然后再Filter  
  5.   Project [key#31,value#32]  
  6.    MetastoreRelation default, src, None  
 优化后的计划为:

 

 

[java] view plain copy  
  1. query.queryExecution.optimizedPlan  
  2. res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [key#31]  
  4.  Filter (key#31 > 100) //先filter,然后再select  
  5.   MetastoreRelation default, src, None  

 

2.3.3、ColumnPruning

  列裁剪用的比较多,就是减少不必要select的某些列。   列裁剪在3种地方可以用:   1、在聚合操作中,可以做列裁剪   2、在join操作中,左右孩子可以做列裁剪   3、合并相邻的Project的列
[java] view plain copy  
  1. object ColumnPruning extends Rule[LogicalPlan] {  
  2.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.     // Eliminate attributes that are not needed to calculate the specified aggregates.  
  4.     case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => ////如果project的outputSet中减去a.references的元素如果不同,那么就将Aggreagte的child替换为a.references  
  5.       a.copy(child = Project(a.references.toSeq, child))  
  6.   
  7.     // Eliminate unneeded attributes from either side of a Join.  
  8.     case Project(projectList, Join(left, right, joinType, condition)) =>// 消除join的left 和 right孩子的不必要属性,将join的左右子树的列进行裁剪  
  9.       // Collect the list of off references required either above or to evaluate the condition.  
  10.       val allReferences: Set[Attribute] =  
  11.         projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)  
  12.   
  13.       /** Applies a projection only when the child is producing unnecessary attributes */  
  14.       def prunedChild(c: LogicalPlan) =  
  15.         if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {  
  16.           Project(allReferences.filter(c.outputSet.contains).toSeq, c)  
  17.         } else {  
  18.           c  
  19.         }  
  20.       Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))  
  21.   
  22.     // Combine adjacent Projects.  
  23.     case Project(projectList1, Project(projectList2, child)) => //合并相邻Project的列  
  24.       // Create a map of Aliases to their values from the child projection.  
  25.       // e.g., ‘SELECT ... FROM (SELECT a + b AS c, d ...)‘ produces Map(c -> Alias(a + b, c)).  
  26.       val aliasMap = projectList2.collect {  
  27.         case a @ Alias(e, _) => (a.toAttribute: Expression, a)  
  28.       }.toMap  
  29.   
  30.       // Substitute any attributes that are produced by the child projection, so that we safely  
  31.       // eliminate it.  
  32.       // e.g., ‘SELECT c + 1 FROM (SELECT a + b AS C ...‘ produces ‘SELECT a + b + 1 ...‘  
  33.       // TODO: Fix TransformBase to avoid the cast below.  
  34.       val substitutedProjection = projectList1.map(_.transform {  
  35.         case a if aliasMap.contains(a) => aliasMap(a)  
  36.       }).asInstanceOf[Seq[NamedExpression]]  
  37.   
  38.       Project(substitutedProjection, child)  
  39.   
  40.     // Eliminate no-op Projects  
  41.     case Project(projectList, child) if child.output == projectList => child  
  42.   }  
  43. }  
分别举三个例子来对应三种情况进行说明: 1、在聚合操作中,可以做列裁剪 给定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key") 优化前:

 

 

[java] view plain copy  
  1. res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  2. Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]  
  3.  Project [key#51,value#52] //优化前默认select key 和 value两列  
  4.   MetastoreRelation default, temp_shengli, None  
优化后:

 

 

[java] view plain copy  
  1. scala> ColumnPruning1(query.queryExecution.analyzed)  
  2. MetastoreRelation default, temp_shengli, None  
  3. res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  4. Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]  
  5.  Project [key#51]  //优化后,列裁剪掉了value,只select key  
  6.   MetastoreRelation default, temp_shengli, None  

2、在join操作中,左右孩子可以做列裁剪

 

给定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b  on a.key =b.key ")
没有优化之前:

 

[java] view plain copy  
  1. scala> query.queryExecution.analyzed  
  2. res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [value#42 AS qween#39]  
  4.  Join Inner, Some((key#41 = key#43))  
  5.   Project [key#41,value#42]  //这里多select了一列,即value  
  6.    MetastoreRelation default, temp_shengli, None  
  7.   Project [key#43,value#44]  //这里多select了一列,即value  
  8.    MetastoreRelation default, temp_shengli, None  
优化后:(ColumnPruning2是我自己调试用的)
[java] view plain copy  
  1. scala> ColumnPruning2(query.queryExecution.analyzed)  
  2. allReferences is -> Set(key#35, key#37)  
  3. MetastoreRelation default, temp_shengli, None  
  4. MetastoreRelation default, temp_shengli, None  
  5. res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  6. Project [key#35 AS qween#33]  
  7.  Join Inner, Some((key#35 = key#37))  
  8.   Project [key#35]   //经过列裁剪之后,left Child只需要select key这一个列  
  9.    MetastoreRelation default, temp_shengli, None  
  10.   Project [key#37]   //经过列裁剪之后,right Child只需要select key这一个列  
  11.    MetastoreRelation default, temp_shengli, None  
3、合并相邻的Project的列,裁剪

 

给定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ")  

优化前:

 

[java] view plain copy  
  1. scala> query.queryExecution.analyzed  
  2. res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [(c#56 + 1) AS c0#57]  
  4.  Project [(1 + 1) AS c#56]  
  5.   MetastoreRelation default, temp_shengli, None  
优化后:

 

 

[java] view plain copy  
  1. scala> query.queryExecution.optimizedPlan  
  2. res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [(2 AS c#56 + 1) AS c0#57] //将子查询里的c 代入到 外层select里的c,直接计算结果  
  4.  MetastoreRelation default, temp_shengli, None  

 

三、总结:

 

  本文介绍了Optimizer在Catalyst里的作用即将Analyzed Logical Plan 经过对Logical Plan和Expression进行Rule的应用transfrom,从而达到树的节点进行合并和优化。其中主要的优化的策略总结起来是合并、列裁剪、过滤器下推几大类。

  Catalyst应该在不断迭代中,本文只是基于spark1.0.0进行研究,后续如果新加入的优化策略也会在后续补充进来。

  欢迎大家讨论,共同进步!

——EOF——

 

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/38121259

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

技术分享

 转自:http://blog.csdn.net/oopsoom/article/details/38121259 

第五篇:Spark SQL Catalyst源码分析之Optimizer

标签:select   返回值   应该   pac   fonts   spark sql   10个   ssi   specified   

人气教程排行