【原创】经验分享(15)spark sql limit实现原理
时间:2021-07-01 10:21:17
帮助过:19人阅读
class CollectLimitExec(limit: Int, child: SparkPlan)
extends UnaryExecNode {
...
protected override def doExecute(): RDD[InternalRow] =
{
val locallyLimited =
child.execute().mapPartitionsInternal(_.take(limit))
val shuffled =
new ShuffledRowRDD(
ShuffleExchange.prepareShuffleDependency(
locallyLimited, child.output, SinglePartition, serializer))
shuffled.mapPartitionsInternal(_.take(limit))
}
可见实现非常简单,首先调用SparkPlan.execute得到结果的RDD,然后从每个partition中取前limit个row得到一个新的RDD,然后再将这个新的RDD变成一个分区,然后再取前limit个,这样就得到最终的结果。
【原创】经验分享(15)spark sql limit实现原理
标签:https test tor div table intern ide 原创 from