时间:2021-07-01 10:21:17 帮助过:35人阅读
case class LeftSemiJoinHash( leftKeys: Seq[Expression], rightKeys: Seq[Expression], left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { val buildSide = BuildRight //buildSide是以右表为基准 override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil override def output = left.output def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的物理计划运行后生成RDD,利用zipPartitions对Partition进行合并。然后用上述方法实现。 val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null // Create a Hash set of buildKeys while (buildIter.hasNext) { currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) if(!rowKey.anyNull) { val keyExists = hashSet.contains(rowKey) if (!keyExists) { hashSet.add(rowKey) } } } val joinKeys = streamSideKeyGenerator() streamIter.filter(current => { !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue) }) } } }
case class BroadcastHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: BuildSide, left: SparkPlan, right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin { override def otherCopyArgs = sqlContext :: Nil override def outputPartitioning: Partitioning = left.outputPartitioning override def requiredChildDistribution = UnspecifiedDistribution :: UnspecifiedDistribution :: Nil @transient lazy val broadcastFuture = future { //利用SparkContext广播表 sqlContext.sparkContext.broadcast(buildPlan.executeCollect()) } def execute() = { val broadcastRelation = Await.result(broadcastFuture, 5.minute) streamedPlan.execute().mapPartitions { streamedIter => joinIterators(broadcastRelation.value.iterator, streamedIter) //调用joinIterators对每一个分区map } } }
然后利用SparkContext里的zipPartitions方法对每一个分区进行zip。
这里的requiredChildDistribution。的是ClusteredDistribution,这个会在HashPartitioning里面进行匹配。关于这里面的分区这里不赘述,能够去org.apache.spark.sql.catalyst.plans.physical下的partitioning里面去查看。case class ShuffledHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: BuildSide, left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { override def outputPartitioning: Partitioning = left.outputPartitioning override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => joinIterators(buildIter, streamIter) } } }
原创文章,转载请注明:
转载自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory
本文链接地址:http://blog.csdn.net/oopsoom/article/details/38274621
注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议。欢迎转载、转发和评论。可是请保留本文作者署名和文章链接。如若须要用于商业目的或者与授权方面的协商,请联系我。
Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现
标签:ide set select sof sample 封装 element rod 并发