- object Optimizer extends RuleExecutor[LogicalPlan] {
- val batches =
- Batch("Combine Limits", FixedPoint(100),
- CombineLimits) ::
- Batch("ConstantFolding", FixedPoint(100),
- NullPropagation,
- ConstantFolding,
- BooleanSimplification,
- SimplifyFilters,
- SimplifyCasts,
- SimplifyCaseConversionExpressions) ::
- Batch("Filter Pushdown", FixedPoint(100),
- CombineFilters,
- PushPredicateThroughProject,
- PushPredicateThroughJoin,
- ColumnPruning) :: Nil
- }
另外提一点,Optimizer里不但对Logical Plan进行了优化,而且对Logical Plan中的Expression也进行了优化,所以有必要了解一下Expression相关类,主要是用到了references和outputSet,references主要是Logical Plan或Expression节点的所依赖的那些Expressions,而outputSet是Logical Plan所有的Attribute的输出:
如:Aggregate是一个Logical Plan, 它的references就是group by的表达式 和 aggreagate的表达式的并集去重。
[java] view plain
copy
- case class Aggregate(
- groupingExpressions: Seq[Expression],
- aggregateExpressions: Seq[NamedExpression],
- child: LogicalPlan)
- extends UnaryNode {
-
- override def output = aggregateExpressions.map(_.toAttribute)
- override def references =
- (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
- }
二、优化策略详解
Optimizer的优化策略不仅有对plan进行transform的,也有对expression进行transform的,究其原理就是遍历树,然后应用优化的Rule,但是注意一点,对Logical Plantransfrom的是先序遍历(pre-order),而对Expression transfrom的时候是后序遍历(post-order):
2.1、Batch: Combine Limits
如果出现了2个Limit,则将2个Limit合并为一个,这个要求一个Limit是另一个Limit的grandChild。
[java] view plain
copy
-
- object CombineLimits extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case ll @ Limit(le, nl @ Limit(ne, grandChild)) =>
- Limit(If(LessThan(ne, le), ne, le), grandChild)
- }
- }
给定SQL:val query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ")
[java] view plain
copy
- scala> query.queryExecution.analyzed
- res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Limit 10
- Project [key#13,value#14]
- Limit 100
- Project [key#13,value#14]
- MetastoreRelation default, temp_shengli, None
子查询里limit100,外层查询limit10,这里我们当然可以在子查询里不必查那么多,因为外层只需要10个,所以这里会合并Limit10,和Limit100 为 Limit 10。
2.2、Batch: ConstantFolding
这个Batch里包含了Rules:NullPropagation,ConstantFolding,BooleanSimplification,SimplifyFilters,SimplifyCasts,SimplifyCaseConversionExpressions。
2.2.1、Rule:NullPropagation
这里先提一下Literal字面量,它其实是一个能匹配任意基本类型的类。(为下文做铺垫)
[java] view plain
copy
- object Literal {
- def apply(v: Any): Literal = v match {
- case i: Int => Literal(i, IntegerType)
- case l: Long => Literal(l, LongType)
- case d: Double => Literal(d, DoubleType)
- case f: Float => Literal(f, FloatType)
- case b: Byte => Literal(b, ByteType)
- case s: Short => Literal(s, ShortType)
- case s: String => Literal(s, StringType)
- case b: Boolean => Literal(b, BooleanType)
- case d: BigDecimal => Literal(d, DecimalType)
- case t: Timestamp => Literal(t, TimestampType)
- case a: Array[Byte] => Literal(a, BinaryType)
- case null => Literal(null, NullType)
- }
- }
注意Literal是一个LeafExpression,核心方法是eval,给定Row,计算表达式返回值:
[java] view plain
copy
- case class Literal(value: Any, dataType: DataType) extends LeafExpression {
- override def foldable = true
- def nullable = value == null
- def references = Set.empty
- override def toString = if (value != null) value.toString else "null"
- type EvaluatedType = Any
- override def eval(input: Row):Any = value
- }
现在来看一下NullPropagation都做了什么。
NullPropagation是一个能将Expression Expressions替换为等价的Literal值的优化,并且能够避免NULL值在SQL语法树的传播。
[java] view plain
copy
- object NullPropagation extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case q: LogicalPlan => q transformExpressionsUp {
- case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType)
- case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)<span style="font-family: Arial;">
- case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)
- case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)
- case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)
- case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)
- case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)
- case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)
- case e @ Coalesce(children) => {
- val newChildren = children.filter(c => c match {
- case Literal(null, _) => false
- case _ => true
- })
- if (newChildren.length == 0) {
- Literal(null, e.dataType)
- } else if (newChildren.length == 1) {
- newChildren(0)
- } else {
- Coalesce(newChildren)
- }
- }
- case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValue
- case e @ In(Literal(v, _), list) if (list.exists(c => c match {
- case Literal(candidate, _) if candidate == v => true
- case _ => false
- })) => Literal(true, BooleanType)
-
- case e: BinaryArithmetic => e.children match {
- case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
- case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
- case _ => e
- }
- case e: BinaryComparison => e.children match {
- case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
- case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
- case _ => e
- }
- case e: StringRegexExpression => e.children match {
- case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
- case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
- case _ => e
- }
- }
- }
- }
给定SQL: val query = sql("select count(null) from temp_shengli where key is not null")
[java] view plain
copy
- scala> query.queryExecution.analyzed
- res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Aggregate [], [COUNT(null) AS c0#5L]
- Filter IS NOT NULL key#7
- MetastoreRelation default, temp_shengli, None
调用NullPropagation
[java] view plain
copy
- scala> NullPropagation(query.queryExecution.analyzed)
- res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Aggregate [], [CAST(0, LongType) AS c0#5L]
- Filter IS NOT NULL key#7
- MetastoreRelation default, temp_shengli, None
2.2.2、Rule:ConstantFolding
常量合并是属于Expression优化的一种,对于可以直接计算的常量,不用放到物理执行里去生成对象来计算了,直接可以在计划里就计算出来:
[java] view plain
copy
- object ConstantFolding extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case q: LogicalPlan => q transformExpressionsDown {
-
- case l: Literal => l
- case e if e.foldable => Literal(e.eval(null), e.dataType)
- }
- }
- }
给定SQL: val query = sql("select 1+2+3+4 from temp_shengli")
[java] view plain
copy
- scala> query.queryExecution.analyzed
- res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [(((1 + 2) + 3) + 4) AS c0#21]
- MetastoreRelation default, src, None
优化后:
[java] view plain
copy
- scala> query.queryExecution.optimizedPlan
- res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [10 AS c0#21]
- MetastoreRelation default, src, None
2.2.3、BooleanSimplification
这个是对布尔表达式的优化,有点像java布尔表达式中的短路判断,不过这个写的倒是很优雅。
看看布尔表达式2边能不能通过只计算1边,而省去计算另一边而提高效率,称为简化布尔表达式。
解释请看我写的注释:
[java] view plain
copy
- object BooleanSimplification extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case q: LogicalPlan => q transformExpressionsUp {
- case and @ And(left, right) =>
- (left, right) match {
- case (Literal(true, BooleanType), r) => r
- case (l, Literal(true, BooleanType)) => l
- case (Literal(false, BooleanType), _) => Literal(false)
- case (_, Literal(false, BooleanType)) => Literal(false)
- case (_, _) => and
- }
-
- case or @ Or(left, right) =>
- (left, right) match {
- case (Literal(true, BooleanType), _) => Literal(true)
- case (_, Literal(true, BooleanType)) => Literal(true)
- case (Literal(false, BooleanType), r) => r
- case (l, Literal(false, BooleanType)) => l
- case (_, _) => or
- }
- }
- }
- }
2.3 Batch: Filter Pushdown
Filter Pushdown下包含了CombineFilters、PushPredicateThroughProject、PushPredicateThroughJoin、ColumnPruning
Ps:感觉Filter Pushdown的名字起的有点不能涵盖全部比如ColumnPruning列裁剪。
2.3.1、Combine Filters
合并两个相邻的Filter,这个和上述Combine Limit差不多。合并2个节点,就可以减少树的深度从而减少重复执行过滤的代价。
[java] view plain
copy
- object CombineFilters extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
- }
- }
给定SQL:val query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ")
优化前:我们看到一个filter 是另一个filter的grandChild
[java] view plain
copy
- scala> query.queryExecution.analyzed
- res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [key#27]
- Filter (key#27 > 80)
- Project [key#27]
- Filter (key#27 > 100)
- MetastoreRelation default, src, None
优化后:其实filter也可以表达为一个复杂的boolean表达式
[java] view plain
copy
- scala> query.queryExecution.optimizedPlan
- res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [key#27]
- Filter ((key#27 > 100) && (key#27 > 80))
- MetastoreRelation default, src, None
2.3.2 Filter Pushdown
Filter Pushdown,过滤器下推。
原理就是更早的过滤掉不需要的元素来减少开销。
给定SQL:val query = sql("select key from (select * from temp_shengli)a where key>100")
生成的逻辑计划为:
[java] view plain
copy
- scala> scala> query.queryExecution.analyzed
- res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [key#31]
- Filter (key#31 > 100)
- Project [key#31,value#32]
- MetastoreRelation default, src, None
优化后的计划为:
[java] view plain
copy
- query.queryExecution.optimizedPlan
- res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [key#31]
- Filter (key#31 > 100)
- MetastoreRelation default, src, None
2.3.3、ColumnPruning
列裁剪用的比较多,就是减少不必要select的某些列。
列裁剪在3种地方可以用:
1、在聚合操作中,可以做列裁剪
2、在join操作中,左右孩子可以做列裁剪
3、合并相邻的Project的列
[java] view plain
copy
- object ColumnPruning extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-
- case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
- a.copy(child = Project(a.references.toSeq, child))
-
-
- case Project(projectList, Join(left, right, joinType, condition)) =>
-
- val allReferences: Set[Attribute] =
- projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
-
-
- def prunedChild(c: LogicalPlan) =
- if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
- Project(allReferences.filter(c.outputSet.contains).toSeq, c)
- } else {
- c
- }
- Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
-
-
- case Project(projectList1, Project(projectList2, child)) =>
-
-
- val aliasMap = projectList2.collect {
- case a @ Alias(e, _) => (a.toAttribute: Expression, a)
- }.toMap
-
-
-
-
-
- val substitutedProjection = projectList1.map(_.transform {
- case a if aliasMap.contains(a) => aliasMap(a)
- }).asInstanceOf[Seq[NamedExpression]]
-
- Project(substitutedProjection, child)
-
-
- case Project(projectList, child) if child.output == projectList => child
- }
- }
分别举三个例子来对应三种情况进行说明:
1、在聚合操作中,可以做列裁剪
给定SQL:val query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key")
优化前:
[java] view plain
copy
- res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]
- Project [key#51,value#52]
- MetastoreRelation default, temp_shengli, None
优化后:
[java] view plain
copy
- scala> ColumnPruning1(query.queryExecution.analyzed)
- MetastoreRelation default, temp_shengli, None
- res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]
- Project [key#51]
- MetastoreRelation default, temp_shengli, None
2、在join操作中,左右孩子可以做列裁剪
给定SQL:val query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b on a.key =b.key ")
没有优化之前:
[java] view plain
copy
- scala> query.queryExecution.analyzed
- res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [value#42 AS qween#39]
- Join Inner, Some((key#41 = key#43))
- Project [key#41,value#42]
- MetastoreRelation default, temp_shengli, None
- Project [key#43,value#44]
- MetastoreRelation default, temp_shengli, None
优化后:(ColumnPruning2是我自己调试用的)
[java] view plain
copy
- scala> ColumnPruning2(query.queryExecution.analyzed)
- allReferences is -> Set(key#35, key#37)
- MetastoreRelation default, temp_shengli, None
- MetastoreRelation default, temp_shengli, None
- res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [key#35 AS qween#33]
- Join Inner, Some((key#35 = key#37))
- Project [key#35]
- MetastoreRelation default, temp_shengli, None
- Project [key#37]
- MetastoreRelation default, temp_shengli, None
3、合并相邻的Project的列,裁剪
给定SQL:val query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ")
优化前:
[java] view plain
copy
- scala> query.queryExecution.analyzed
- res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [(c#56 + 1) AS c0#57]
- Project [(1 + 1) AS c#56]
- MetastoreRelation default, temp_shengli, None
优化后:
[java] view plain
copy
- scala> query.queryExecution.optimizedPlan
- res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
- Project [(2 AS c#56 + 1) AS c0#57]
- MetastoreRelation default, temp_shengli, None
三、总结:
本文介绍了Optimizer在Catalyst里的作用即将Analyzed Logical Plan 经过对Logical Plan和Expression进行Rule的应用transfrom,从而达到树的节点进行合并和优化。其中主要的优化的策略总结起来是合并、列裁剪、过滤器下推几大类。
Catalyst应该在不断迭代中,本文只是基于spark1.0.0进行研究,后续如果新加入的优化策略也会在后续补充进来。
欢迎大家讨论,共同进步!
——EOF——
原创文章,转载请注明:
转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory
本文链接地址:http://blog.csdn.net/oopsoom/article/details/38121259
注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。
转自:http://blog.csdn.net/oopsoom/article/details/38121259
第五篇:Spark SQL Catalyst源码分析之Optimizer
标签:select 返回值 应该 pac fonts spark sql 10个 ssi specified