时间:2021-07-01 10:21:17 帮助过:11人阅读
这里打印出来tableName是src, 和一个是否要cache的boolean flag.
我们看下CacheCommand的构造:
CacheCommand支持2种操作,一种是把数据源加载带内存中,一种是将数据源从内存中卸载。
对应于SQLContext下的cacheTable和uncacheTabele。
[java] view plain copy
如果调用cached.collect(),则会根据Command命令来执行cache或者uncache操作,这里我们执行cache操作。
cached.collect()将会调用SQLContext下的cacheTable函数:
首先通过catalog查询关系,构造一个SchemaRDD。
[java] view plain copy
找到该Schema的analyzed计划。匹配构造InMemoryRelation:
[java] view plain copy
InMemoryRelation继承自LogicalPlan,是Spark1.1 Spark SQL里新添加的一种TreeNode,也是catalyst里的一种plan. 现在TreeNode变成了4种:
1、BinaryNode 二元节点
2、LeafNode 叶子节点
3、UnaryNode 单孩子节点
4、InMemoryRelation 内存关系型节点
类图如下:
值得注意的是,_cachedColumnBuffers这个类型为RDD[Array[ByteBuffer]]的私有字段。
这个封装就是面向列的存储ByteBuffer。前面提到相较于plain java object存储记录,用ByteBuffer能显著的提高存储效率,减少内存占用。并且按列查询的速度会非常快。
InMemoryRelation具体实现如下:
构造一个InMemoryRelation需要该Relation的output Attributes,是否需要useCoompression来压缩,默认为false,一次处理的多少行数据batchSize, child 即SparkPlan。
[java] view plain copy
可以通过设置:
spark.sql.inMemoryColumnarStorage.compressed 为true来设置内存中的列存储是否需要压缩。
spark.sql.inMemoryColumnarStorage.batchSize 来设置一次处理多少row
spark.sql.defaultSizeInBytes 来设置初始化的column的bufferbytes的默认大小,这里只是其中一个参数。
这些参数都可以在源码中设置,都在SQL Conf
[java] view plain copy
再回到case class InMemoryRelation:
_cachedColumnBuffers就是我们最终将table放入内存的存储句柄,是一个RDD[Array[ByteBuffer]。
1、判断_cachedColumnBuffers是否为null,如果不是null,则已经Cache了当前table,重复cache不会触发cache操作。
2、child是SparkPlan,即执行hive table scan,测试我拿sbt/sbt hive/console里test里的src table为例,操作是扫描这张表。这个表有2个字的key是int, value 是string
3、拿到child的output, 这里的output就是 key, value2个列。
4、执行mapPartitions操作,对当前RDD的每个分区的数据进行操作。
5、对于每一个分区,迭代里面的数据生成新的Iterator。每个Iterator里面是Array[ByteBuffer]
6、对于child.output的每一列,都会生成一个ColumnBuilder,最后组合为一个columnBuilders是一个数组。
7、数组内每个CommandBuilder持有一个ByteBuffer
8、遍历原始分区的记录,将对于的行转为列,并将数据存到ByteBuffer内。
9、最后将此RDD调用cache方法,将RDD缓存。
10、将cached赋给_cachedColumnBuffers。
此操作总结下来是:执行hive table scan操作,返回的MapPartitionsRDD对其重新定义mapPartition方法,将其行转列,并且最终cache到内存中。
所有流程如下:
[java] view plain copy[java] view plain copy
这里会声明一个数组,来对应每一列的存储,如下图:
然后初始化类型builder的时候会传入的参数:
initialBufferSize:文章开头的图中会有ByteBuffer,ByteBuffer的初始化大小是如何计算的?
initialBufferSize = 列类型默认长度 × batchSize ,默认batchSize是1000
拿Int类型举例,initialBufferSize of IntegerType = 4 * 1000
attribute.name即字段名age,name etc。。。
ColumnType封装了 该类型的 typeId 和 该类型的 defaultSize。并且提供了extract、append\getField方法,来向buffer里追加和获取数据。
如IntegerType typeId 为0, defaultSize 4 ......
详细看下类图,画的不是非常严格的类图,主要为了展示目前类型系统:
ColumnBuilder的主要职责是:管理ByteBuffer,包括初始化buffer,添加数据到buffer内,检查剩余空间,和申请新的空间这几项主要职责。
initialize负责初始化buffer。
appendFrom是负责添加数据。
ensureFreeSpace确保buffer的长度动态增加。
类图如下:
初始化大小initialSize:拿Int举例,在前面builder初始化传入的是4×batchSize=4*1000,initialSize也就是4KB,如果没有传入initialSize,则默认是1024×1024。
列名称,是否需要压缩,都是需要传入的。
ByteBuffer声明时预留了4个字节,为了放column type id,这个在ColumnType的构造里有介绍过。
[java] view plain copy
存储的方式如下:
Int的type id 是0, string的 type id 是 7. 后面就是实际存储的数据了。
存储结构都介绍完毕,最后开始对Table进行scan了,scan后对每一个分区的每个Row进行操作遍历:
1、读每个分区的每条Row
2、获取每个列的值,从builders数组里找到索引 i 对应的bytebuffer,追加至bytebuffer。
[java] view plain copy
追加过程:
根据当前builder的类型,从row的对应索引中取出值,最后追加到builder的bytebuffer内。
[java] view plain copy
ensureFreeSpace:
主要是操作buffer,如果要追加的数据大于剩余空间,就扩大buffer。
[java] view plain copy
......
最后调用MapPartitionsRDD.cache(),将该RDD缓存并添加到spark cache管理中。
至此,我们将一张spark sql table缓存到了spark的jvm中。
对于数据的存储结构,我们常常关注持久化的存储结构,并且在长久时间内有了很多种高效结构。
但是在实时性的要求下,内存数据库越来越被关注,如何优化内存数据库的存储结构,是一个重点,也是一个难点。
对于Spark SQL 和 Shark 里的列存储 是一种优化方案,提高了关系查询中列查询的速度,和减少了内存占用。但是中存储方式还是比较简单的,没有额外的元数据和索引来提高查询效率,希望以后能了解到更多的In-Memory Storage。
——EOF——
创文章,转载请注明:
转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory
本文链接地址:http://blog.csdn.net/oopsoom/article/details/39525483
注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。
转自:http://blog.csdn.net/oopsoom/article/details/39525483
第九篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 cache table
标签:efault tin xtend gis 初始化 内存数据库 memory scala treenode