时间:2021-07-01 10:21:17 帮助过:38人阅读
作用
② SimpleFetchOptimizer
优化没有GroupBy表达式的聚合查询
② MapJoinProcessor
MapJoin,需要SQL中提供hint,0.11版本已不用
② BucketMapJoinOptimizer
BucketMapJoin
② GroupByOptimizer
Map端聚合
① ReduceSinkDeDuplication
合并线性的OperatorTree中partition/sort key相同的reduce
① PredicatePushDown
谓词前置
① CorrelationOptimizer
利用查询中的相关性,合并有相关性的Job,HIVE-2206
ColumnPruner
字段剪枝
表格中①的优化器均是一个Job干尽可能多的事情/合并。②的都是减少shuffle数据量,甚至不做Reduce。
CorrelationOptimizer优化器非常复杂,都能利用查询中的相关性,合并有相关性的Job,参考 Hive Correlation Optimizer
对于样例SQL,有两个优化器对其进行优化。下面分别介绍这两个优化器的作用,并补充一个优化器ReduceSinkDeDuplication的作用
断言判断提前优化器将OperatorTree中的FilterOperator提前到TableScanOperator之后
NonBlockingOpDeDupProc
优化器合并SEL-SEL 或者 FIL-FIL 为一个Operator
ReduceSinkDeDuplication可以合并线性相连的两个RS。实际上CorrelationOptimizer是ReduceSinkDeDuplication的超集,能合并线性和非线性的操作RS,但是Hive先实现的ReduceSinkDeDuplication
譬如下面这条SQL语句
from (select key, value from src group by key, value) s select s.key group by s.key;
经过前面几个阶段之后,会生成如下的OperatorTree,两个Tree是相连的,这里没有画到一起
这时候遍历OperatorTree后能发现前前后两个RS输出的Key值和PartitionKey如下
|
Key |
PartitionKey |
---|---|---|
childRS |
key |
key |
parentRS |
key,value |
key,value |
ReduceSinkDeDuplication优化器检测到:1. pRS Key完全包含cRS Key,且排序顺序一致;2. pRS PartitionKey完全包含cRS PartitionKey。符合优化条件,会对执行计划进行优化。
ReduceSinkDeDuplication将childRS和parentheRS与childRS之间的Operator删掉,保留的RS的Key为key,value字段,PartitionKey为key字段。合并后的OperatorTree如下:
OperatorTree转化为MapReduce Job的过程分为下面几个阶段
由上一步OperatorTree只生成了一个FileSinkOperator,直接生成一个MoveTask,完成将最终生成的HDFS临时文件移动到目标表目录下
MoveTask[Stage-0]
Move Operator
将OperatorTree中的所有根节点保存在一个toWalk的数组中,循环取出数组中的元素(省略QB1,未画出)
取出最后一个元素TS[p]放入栈 opStack{TS[p]}中
发现栈中的元素符合下面规则R1(这里用python代码简单表示)
"".join([t + "%" for t in opStack]) == "TS%"
生成一个MapReduceTask[Stage-1]
对象,MapReduceTask[Stage-1]
对象的MapWork
属性保存Operator根节点的引用。由于OperatorTree之间之间的Parent Child关系,这个时候MapReduceTask[Stage-1]
包含了以TS[p]
为根的所有Operator
继续遍历TS[p]的子Operator,将子Operator存入栈opStack中
当第一个RS进栈后,即栈opStack = {TS[p], FIL[18], RS[4]}时,就会满足下面的规则R2
"".join([t + "%" for t in opStack]) == "TS%.*RS%"
这时候在MapReduceTask[Stage-1]
对象的ReduceWork
属性保存JOIN[5]
的引用
继续遍历JOIN[5]的子Operator,将子Operator存入栈opStack中
当第二个RS放入栈时,即当栈opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6]}
时,就会满足下面的规则R3
"".join([t + "%" for t in opStack]) == “RS%.*RS%” //循环遍历opStack的每一个后缀数组
这时候创建一个新的MapReduceTask[Stage-2]
对象,将OperatorTree从JOIN[5]
和RS[6]
之间剪开,并为JOIN[5]
生成一个子Operator FS[19]
,RS[6]
生成一个TS[20]
,MapReduceTask[Stage-2]
对象的MapWork
属性保存TS[20]
的引用。
新生成的FS[19]
将中间数据落地,存储在HDFS临时文件中。
继续遍历RS[6]的子Operator,将子Operator存入栈opStack中
当opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6], JOIN[8], SEL[10], GBY[12], RS[13]}
时,又会满足R3规则
同理生成MapReduceTask[Stage-3]
对象,并切开 Stage-2 和 Stage-3 的OperatorTree
最终将所有子Operator存入栈中之后,opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6], JOIN[8], SEL[10], GBY[12], RS[13], GBY[14], SEL[15], FS[17]}
满足规则R4
"".join([t + "%" for t in opStack]) == “FS%”
这时候将MoveTask
与MapReduceTask[Stage-3]
连接起来,并生成一个StatsTask
,修改表的元信息
此时并没有结束,还有两个根节点没有遍历。
将opStack栈清空,将toWalk的第二个元素加入栈。会发现opStack = {TS[du]}
继续满足R1 TS%,生成MapReduceTask[Stage-5]
继续从TS[du]
向下遍历,当opStack={TS[du], RS[7]}
时,满足规则R2 TS%.*RS%
此时将JOIN[8]
保存为MapReduceTask[Stage-5]
的ReduceWork
时,发现在一个Map对象保存的Operator与MapReduceWork对象关系的Map<Operator, MapReduceWork>
对象中发现,JOIN[8]
已经存在。此时将MapReduceTask[Stage-2]
和MapReduceTask[Stage-5]
合并为一个MapReduceTask
同理从最后一个根节点TS[c]
开始遍历,也会对MapReduceTask进行合并
最后一个阶段,将MapWork和ReduceWork中的OperatorTree以RS为界限剪开
最终共生成3个MapReduceTask,如下图
这里不详细介绍每个优化器的原理,单独介绍一下MapJoin的优化器
名称 |
作用 |
---|---|
Vectorizer |
HIVE-4160,将在0.13中发布 |
SortMergeJoinResolver |
与bucket配合,类似于归并排序 |
SamplingOptimizer |
并行order by优化器,在0.12中发布 |
CommonJoinResolver + MapJoinResolver |
MapJoin优化器 |
MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。
上图是Hive MapJoin的原理图,出自Facebook工程师Liyin Tang的一篇介绍Join优化的slice,从图中可以看出MapJoin分为两个阶段:
通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
如果Join的两张表一张表是临时表,就会生成一个ConditionalTask,在运行期间判断是否使用MapJoin
CommonJoinResolver优化器就是将CommonJoin转化为MapJoin,转化过程如下
遍历上一个阶段生成的MapReduce任务,发现MapReduceTask[Stage-2]
JOIN[8]
中有一张表为临时表,先对Stage-2进行深度拷贝(由于需要保留原始执行计划为Backup Plan,所以这里将执行计划拷贝了一份),生成一个MapJoinOperator替代JoinOperator,然后生成一个MapReduceLocalWork读取小表生成HashTableFiles上传至DistributedCache中。
MapReduceTask经过变换后的执行计划如下图所示
MapJoinResolver优化器遍历Task Tree,将所有有local work的MapReduceTask拆成两个Task
最终MapJoinResolver处理完之后,执行计划如下图所示
从上述整个SQL编译的过程,可以看出编译过程的设计有几个优点值得学习和借鉴
Hive依然在迅速的发展中,为了提升Hive的性能,hortonworks公司主导的Stinger计划提出了一系列对Hive的改进,比较重要的改进有:
我们也将跟进社区的发展,结合自身的业务需要,提升Hive型ETL流程的性能
Antlr: http://www.antlr.org/
Wiki Antlr介绍: http://en.wikipedia.org/wiki/ANTLR
Hive Wiki: https://cwiki.apache.org/confluence/display/Hive/Home
HiveSQL编译过程: http://www.slideshare.net/recruitcojp/internal-hive
Join Optimization in Hive: Join Strategies in Hive from the 2011 Hadoop Summit (Liyin Tang, Namit Jain)
Hive Design Docs: https://cwiki.apache.org/confluence/display/Hive/DesignDocs
from: https://tech.meituan.com/hive-sql-to-mapreduce.html
Hive SQL的编译过程
标签:slice book exec 第一个 filter 临时文件 expr getconf 表数据