当前位置:Gxlcms > 数据库问题 > 【原创】经验分享(15)spark sql limit实现原理

【原创】经验分享(15)spark sql limit实现原理

时间:2021-07-01 10:21:17 帮助过:19人阅读

class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { ... protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( locallyLimited, child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) }

可见实现非常简单,首先调用SparkPlan.execute得到结果的RDD,然后从每个partition中取前limit个row得到一个新的RDD,然后再将这个新的RDD变成一个分区,然后再取前limit个,这样就得到最终的结果。

 

【原创】经验分享(15)spark sql limit实现原理

标签:https   test   tor   div   table   intern   ide   原创   from   

人气教程排行