当前位置:Gxlcms > 数据库问题 > Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

时间:2021-07-01 10:21:17 帮助过:35人阅读

的版本号。 将右表的join keys放到HashSet里。然后遍历左表,查找左表的join key能否匹配。
case class LeftSemiJoinHash(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    left: SparkPlan,
    right: SparkPlan) extends BinaryNode with HashJoin {

  val buildSide = BuildRight //buildSide是以右表为基准

  override def requiredChildDistribution =
    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

  override def output = left.output

  def execute() = {
    buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的物理计划运行后生成RDD,利用zipPartitions对Partition进行合并。然后用上述方法实现。
      val hashSet = new java.util.HashSet[Row]()
      var currentRow: Row = null

      // Create a Hash set of buildKeys
      while (buildIter.hasNext) {
        currentRow = buildIter.next()
        val rowKey = buildSideKeyGenerator(currentRow)
        if(!rowKey.anyNull) {
          val keyExists = hashSet.contains(rowKey)
          if (!keyExists) {
            hashSet.add(rowKey)
          }
        }
      }

      val joinKeys = streamSideKeyGenerator()
      streamIter.filter(current => {
        !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
      })
    }
  }
}

2.2、BroadcastHashJoin

 名约: 广播HashJoin,呵呵。  是InnerHashJoin的实现。这里用到了concurrent并发里的future,异步的广播buildPlan的表运行后的的RDD。

  假设接收到了广播后的表,那么就用streamedPlan来匹配这个广播的表。

  实现是RDD的mapPartitions和HashJoin里的joinIterators最后生成join的结果。
case class BroadcastHashJoin(
     leftKeys: Seq[Expression],
     rightKeys: Seq[Expression],
     buildSide: BuildSide,
     left: SparkPlan,
     right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {

  override def otherCopyArgs = sqlContext :: Nil

  override def outputPartitioning: Partitioning = left.outputPartitioning

  override def requiredChildDistribution =
    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil

  @transient
  lazy val broadcastFuture = future {  //利用SparkContext广播表
    sqlContext.sparkContext.broadcast(buildPlan.executeCollect())
  }

  def execute() = {
    val broadcastRelation = Await.result(broadcastFuture, 5.minute)

    streamedPlan.execute().mapPartitions { streamedIter =>
      joinIterators(broadcastRelation.value.iterator, streamedIter) //调用joinIterators对每一个分区map
    }
  }
}

2.3、ShuffleHashJoin

ShuffleHashJoin顾名思义就是须要shuffle数据,outputPartitioning是左孩子的的Partitioning。

会依据这个Partitioning进行shuffle。

然后利用SparkContext里的zipPartitions方法对每一个分区进行zip。

这里的requiredChildDistribution。的是ClusteredDistribution,这个会在HashPartitioning里面进行匹配。

关于这里面的分区这里不赘述,能够去org.apache.spark.sql.catalyst.plans.physical下的partitioning里面去查看。
case class ShuffledHashJoin(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    buildSide: BuildSide,
    left: SparkPlan,
    right: SparkPlan) extends BinaryNode with HashJoin {

  override def outputPartitioning: Partitioning = left.outputPartitioning

  override def requiredChildDistribution =
    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

  def execute() = {
    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    }
  }
}


未完待续 :)

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/38274621

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议。欢迎转载、转发和评论。可是请保留本文作者署名和文章链接。如若须要用于商业目的或者与授权方面的协商,请联系我。

技术分享


Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

标签:ide   set   select   sof   sample   封装   element   rod   并发   

人气教程排行