时间:2021-07-01 10:21:17 帮助过:1人阅读
2、Config servers:配置服务器,Cluster的枢纽部分,用于保存metadata数据;production环境中,需要三个(exactly)config servers,所有的config servers都有效时才能将metadata保存成功;这三个config servers并非replica set结构,它们独立部署。(参见下文)不过对于测试环境,可以只有一个config server;如果只有一个config server,那么它将成为集群的单点。如果config servers失效,那么整个集群也将无法访问,如果metadata数据丢失,那么整个集群将无法使用。
Config servers将metadata保存在config数据库中(稍后介绍),mongos实例将会从config server中获取metadata并在本地缓存,并用于路由reads、writes请求。mongodb只会在“chunk迁移之后”、“chunk split之后”才会修改metadata数据。当需要修改metadata时,协调者(mongos)将会把变更指令发送给三个config servers并获得它们的响应结果,如果结果不同,则意味着产生了数据不一致的情况,则可能需要人工干预;此时,balancer也将不会执行chunk迁移,mongos也不会执行chunks分裂。
当mongos启动时将会从config servers获取metadata信息,运行时的某些错误也会导致mongos重新获取metadata。此外,config server中还保存了一些locks,我们稍后介绍。
只要有一个config server失效,那么集群的metadata都将处于只读状态,可以对shards进行数据读写,但是chunks分离和迁移将不能进行,知道三个config servers全部有效为止。如果三个config servers都失效,那么意味着集群将不能读取metadata数据,如果此时重启mongos,那么它将不能获取metadata数据,将无法提供router,直到config servers有效。此外,metadata的数据量非常小,所以这不会对Config servers或者mongos带来存储上的压力。Config server的负载非常小,它对硬件配置要求很低,只需要较少的内存和存储空间即可。
prodution环境中,需要三个config servers;如果仅仅为了测试可以只需要一个。
备注:mongodb 3.2+版本终于调整了Config servers的部署模式(这里总有问不完的“为什么需要三个Config Servers”),放弃了必须使用三个Config Servers的要求;Config Servers可以采用“Replica set”架构模式,且必须使用WiredTiger存储引擎。这种调整,可以有效的提升config servers的数据一致性,可能利用replica set架构的优点,Config Servers的个数可以扩展到50个节点。不过replica set中,不能有“arbiters”、“delayed”类型的members,且它们的“buildIndexes”必须设定为true。
3、mongos:routers,本身不保存任何用户数据,负责转发客户端的读写请求、对shard的结果进行收集归并、运行balancer进程、跟踪split等;通常我们在每个application节点上部署一个mongos,因为mongos占用内存极少,几乎不占用磁盘,它只需要消耗一定的内存、CPU用于处理数据即可,此外这样部署application与mongos通信距离最短、效率较高。你可能想在applications与mongos之间搭建提个proxy或者负载均衡器,这种方式是很难实施的,而且可能会带来很多的问题,proxy需要能够对mongodb的数据protocol进行编解码。对于read、write操作,客户端通常会随机选择一个mongos,这在一定程度上提供了简单的负载均衡;不过对于有Cursor的read操作,在Cursor遍历期间,请求只会发送给一个mongos,因为只有那个mongos持有Curosr信息。
如果query中指定了sort,mongos将$orderby参数传递到选定的shards上(根据shard key选定chunks和shards),有此database的primary shard负责接收和merge每个shard排序后的结果,并将结果通过mongos返回给客户端。如果query指定了limit(),那么mongos将limit传给指定的shards,每个shards通过limit限定数据返回的条数,最终mongos收到的数据条数可能大于limit,所以mongos需要再次应用limit,然后才将结果返回给客户端。如果客户端使用了skip(),mongos不会将skip参数传递给shards,因为这这对结果筛选没有帮助,mongos将收到shards尚未skip的数据,然后skip并组装数据返回给客户端,主要原因是,每个shard时刻都会有新的数据插入,所以mongos无法提前计算该从何处skip。如果skip和limit同时使用,这稍微简单一些,mongos将limit + skip的和,作为limit传递给shards,然后和skip一样,再次在本地执行skip,用于提高查询的性能。这也要求women,需要skip操作时,尽可能指定limit以提高效率,而且在sharding环境中,使用sort通常是一个比较高耗的操作(尽管shard key索引是有序的)。
对于没有指定shard key的查询、update、remove以及“聚合”方法,mongos都会将操作广播给所有的shards。对于以上非sharded collection,它们的数据会被保存在primary shard上,尽管application可以直接链接此shard获取数据,但是为了集群数据访问的协调性,我们建议仍使用mongos作为router。
二、shard key(分片键)
shark key可以决定collection数据在集群的分布,shard key必须为索引字段或者为组合索引的左前缀。documents插入成功后,任何update操作都不能修改shard key,否则会抛出异常。我们不能将
“multikey index”作为shard key。
Hashed类型分片键,只能对单个Filed建立hashed索引;所以选择分片键需要非常慎重,最好它具有较好的“维度”(cardinality,基数),即此字段的重复值较少;单调递增的字段值作为Hashed分片键是一个不错的选择,比如ObjectId或者timestamp。如果对一个空的collection使用Hashed分片键,默认情况下mongodb自动在每个shard节点上创建2个空的chunks,不过我们可以在shardCollection指令中指定“numInitialChunks”参数来限定初始化chunks的个数。
在选择shard key时,需要考虑到应用的需求,读写比、以及读取数据的方式。如果cluster有较大的write请求,极少的read或者update,那么shard key就需要注意write压力的分流,尽可能让write操作分散在多个shards上,比如采用Hashed分区、使用ObjectId(单调递增)作为shard key。如果read量很大,只有较少的write,此时需要考虑read的方式,如果通常为range查询(比如timestamp > 某个时间),那么就需要使用Range分区 + 单调递增的shard key方式(timestamp),如果通常查询的匹配方式通常为“相等”比较,那么采用Hash分区可以获得更好的性能。
就是高效的查询方式就是mongos只需要将请求转发到单个shard上,相反,如果查询中没有指定shard key,mongos将会把请求转发到所有的shards,并且等待它们都返回结果,这种“scatter/gather”方式会导致操作时间很长,通常在“聚合方式”中才会出现。如果查询时指定的完整的shard key字段(可能为组合键),那么mongos只会将请求路由到一个shard上;如果查询指定了shard key字段的最左前缀,那么mongos将可能将请求路由到少数多个shards,而且覆盖shard key的字段数量越多,参与查询的shard个数将越少,这个原理和索引的特性非常类似;比如shard key为{"zipcode":1,"name":1,"age":1},那么查询条件为{"zipcode":"10010","name":1}将比只使用"zipcode"查询获得的性能更高,参与查询的shard更少。
通常我们应该使用组合字段作为shard key,除非能够断定某单个字段值是“唯一的、不重复的”才会使用单个字段作为shard key,最终组合字段必须能够提高cardinality(降低重复值),这样对chunk分裂有很大的帮助。
三、sharding机制
1、balancing:如果一个shard上chunks比其他shard更多,即不平衡状态,那么mongos将会自动对chunks迁移以达到平衡,balancing的过程不会影响用户的数据操作。集群中任何mongos实例都可以启动balancing线程,默认balancer是开启状态;Config 数据库(Config servers中)中有个lock表,当balancer活跃时,相应的mongos将会尝试通过修改document方式获取“lock”,如果获取“lock”成功,则此mongos负责balancing工作。大家需要注意,mongos实例的本地系统时间会对lock机制带来影响,需要所有的mongos(包括集群中的所有shards、config servers)的时间保持一致(ntpd指令)。
balancer将chunks从持有chunks最多的shard上迁移到持久chunks最少的shard,每次迁移一个,直到集群相对平衡(最多与最少之间相差不超过2个)。chunks迁移可能会消耗磁盘空间,那些已经迁移出去的chunks不会立即删除,而是归档到一个特定的目录下,“归档”(archive)特性默认是开启的;此外迁移还会消耗一定的网络带宽,或许会影响到性能,影响用户操作的IO吞吐量。建议每次迁移一个chunk,且只有当“最多”与“最少”的差值达到threshold时才开始balancer;或者指定一个时间区间,balancer只会在此时间段内才会迁移chunks。
threshold:最小化balancing对集群的影响,只有当shards上“最多”与“最少”chunks个数差值达到阀值时,才会重新平衡chunks分布。threshold的值目前没有办法修改,当chunks总数< 20时,此值为2,总数 >= 80时,此值为8,其他为4。一旦balancing工作启动,只有当chunks分布均衡后才会停止,即“最多”与“最少”的差值不大于2。
默认情况下,mongodb会尽可能的耗尽可用磁盘空间,所以我们需要关注mongodb对磁盘的消耗量;不过当向集群中添加shard节点时,可以指定当前shard允许使用的最大磁盘空间(max size),当shard的磁盘消耗量达到最大值后,balancer将不会向其再迁移chunks,但这不影响此shard上继续接受write操作。(参见下文addShard指令)
2、chunks迁移过程:
最后一步主要是source shard等待cursor关闭并删除chunk,称为“删除阶段”,不过balancer可以不需要等待它结束即可开始下一个chunk的迁移,在一定程度上提高了迁移的效率,可以让chunks数据尽快迁移完毕,集群尽快达到均衡。有时候,“删除阶段”可能需要等到很长时间,那么我们可以指定“_waitForDelete”参数表示等待“删除阶段”的最长时间,超时后balancer将放弃等待转而开始迁移下一个chunk。
如果被迁移的chunk尺寸已经超过了设定值或者其持有的documents个数超过最大值(参见此文),它将不能被迁移,需要等待被split后才能迁移。
_secondaryThrottle(节流,阀门):通常每个shard是一个replica set结构,对于chunk迁移,其实就是destination批量读取source中的documents并写入到replica set的过程(primary),这时就涉及到“write concern”问题,即write写入到多少个secondaries之后才返回。sharding环境中“_secondaryThrottle”参数就是用于控制此特性,默认为true,表示至少同步给一个secondary,语义等同于write concern中的{w:2},只有chunks中所有的documents同步到至少一个secondary后才会继续迁移下一个chunk;可以将此值设置为false,即关闭“阀门”,默认效果等同于{w : 1},不过此时我们还可以额外的指定“write concern”参数表示documents需要同步到多个secondaries。(运维方式参见下文)
3、spit:默认每个chunk的大小为64M,我们可以调节此值;较小的chunk可以将使数据分布的更加均衡,便于迁移,但是带来的问题就是split更加频繁,也增加了mongos路由的开支(每个chunk持有的数据量小,每个query意味着需要访问的chunk个数较多);较大的chunk不便于迁移,但是split次数较少,metadata信息较少,mongos路由简单,不过如果chunk过大会导致数据分布不均。不过个人认为64M还是太小了,建议增加到256M。
因为spit操作只会有insert或者update触发。
4、shark key indexes:前文已经了解到,将一个collection开启sharding时需要指定shard key,不过在此之前,需要创建一个以shard key字段开头的索引。比如shard key为{"zipcode" : 1,"username" : 1},那么需要创建所以{"zipcode" : 1,"username" : 1}或者{"zipcode" : 1,"username" : 1,"others":1...}。
四、部署
我们本机构建一个sharding测试环境,节点部署列表如下:
1)shard:2个,端口分别为27018、28018,单节点。(提示,线上环境,至少2个shards,且每个shard都是replica set结构)
2)config server:1个,端口为27019。(提示,线上环境,必须三个config servers)
3)mongos:一个,端口为27017。(提示,线上环境,随application节点部署,通常有多个)
需要注意,我们确保所有同类型的节点的配置一样(除端口、文件路径外),以免出现问题。如下配置是基于“测试环境”的,如果为production,需要将“smallFiles”设置为true。
1、Config server部署
Java代码
大家需要清楚,config server上需要保存数据,比如“config”数据库,所以需要配置engine的参数;此外比较重要的就是sharding部分,指定clusterRole为“configsvr”。
Java代码
2、shard节点部署:本例中有2个shard节点,配置文件除了port和dbpath不同之外,其他配置一样,如下为shard_0.conf示例:
Java代码
配置与config server差不多,需要注意的是clusterRole需要为“shardsvr”;此外我们设定了“archiveMovedChunks”为false表示在chunks迁移完成之后直接删除,否则将chunks移动到“moveChunk”目录下。
Java代码
3、mongos部署
Java代码
mongos不需要存储任何数据,所以它不需要配置storage有关的参数,最重要的参数为configDB,指定congfig servers的地址列表,如果为多个则已“,”分割。启动mongos进程:
Java代码
4、addShard:上文的配置文件可知,mongos配置了config servers的地址,那么mongos与config servers可以建立通讯;但是我们尚没有看到shard节点如何参与到集群的。在sharding集群中,提供了addShard指令,我们可以在运行时动态添加shard节点。注意以后几乎所有的用户操作,均需要通过mongos,我们通过mongo shell链接到mongos并执行如下操作,将shard_0和shard_1添加到sharding集群中:
Java代码
我们将两个“孤立”的shard通过addShard方法添加到sharding集群,addShard方法接收host地址,如果shard为replica set结构,那么需要通过addShard方法将所有的members添加到集群(复制集架构模式参):
Java代码我们可以通过sh.status()方法查看sharding集群的状态,其中包括shards列表信息。
如果你想限制shard的磁盘使用量,则使用addShard指令来指定maxSize(单位MB)
Java代码
5、开启sharding
sharding集群已经构建完成,接下来需要存储数据;但是首先需要将Database开启sharding,否则数据仍然无法在集群中分布,即数据库、collection默认为non-sharding。对于non-sharding的database或者collection均会保存在primary shard上(概念参见上文),直到开启sharding才会在集群中分布。
Java代码
此后我们可以对collection开启sharding,在此之前需要先指定shard key和建立“shard key索引”,我们根据application对数据访问的模式,来设定shard key,比如我们有一个address表用户的地址,这个表通常使用zipcode来查询数据():
Java代码
那么address表将使用“zipcode”作为第一维shard key,采用range分区模式,如果某个chunk中的数据到达“max chunk size”时将会根据zipcode分裂成2个chunk;如果某个chunk中所有的documents的zipcode都一样时,则会使用“name”作为第二维shard key,仍采用range分区模式(name可以为字符串,根据其索引排序分裂),将此chunk分裂成2个,我们可以通过sh.status()查看每个chunk的分裂区间:
Java代码
我们可以看出“name”在chunk中是按照字典顺序排序的。我们使用“组合shard key”,在一定程度上可以提高选择性的维度和chunk的可分裂性,如果你是在找不到组合key那么可以将_id作为补充字段:
Java代码
比如有一个orders表,用于保存用户的订单,这个表通常根据用户id查询,那么我们可以对“userid”字段建立hash索引,以及建立hash分区的shard key:
Java代码
前文已经提到,如果使用hash分区的话,那么shard key的重复值一定要尽可能的少,否则这些相同值的document将会保存在同一个shard上,而导致shard无法分裂,从而失去sharding的意义。“_id”字段可以在hash分区时非常有效,可以考虑选用。
五、运维
1、查看集群信息
上文中我们提到sh.status()方法,此方法可以查看集群中有关“database是否开启sharding”、“primary shard的位置”、“collection的chunks列表”等详细信息,因为版面问题,暂不赘言,请参考“db.status()”。
此外,我们在“config”数据库中也可以看到很多系统自建的collections:
Java代码
比如我要查看集群中chunks的列表,那么可以直接从“chunks”这个collection查询即可。“collections”表存储了每个collection的配置信息,“databases”表可以查看是否开启了sharding,“setttings”表中查看cluster的整体配置信息等等。
2、Balancer配置
balancer运行在mongos实例上,控制chunks的分布和迁移,全局只有一个balancer处于active状态,我们可以通过“sh.getBalancerState()”或者“sh.status()”查看balancer是否开启,可以通过“sh.getBalancerHost()”查看balancer运行在哪个mongos上。
1)可以通过sh.setBalancerState(false)来关闭balancer功能。当然可以通过设定为true开启balancer。
2)尅通过sh.startBlancer()或者sh.stopBalancer()来开关闭balancer。同上。
3)可以通过db.locks.find({_id:"balancer"})查看balancer持有锁的情况。
4)可以通过修改settting表中的配置来指定balancer的运行时间区间:
Java代码
其中start和stop格式为“HH:mm”,不需要指定日期。修改activeWindow配置时需要确保balancer的state为true。
3、_secondaryThrottle与waitForDelete
这两个参数都与chunk迁移有关,其中_secondaryThrottle表示是对secondary进行节流,默认为true,其语义等同write concern中的{w:2},即当chunk迁移时(documents复制)destination shard中至少有一个secondary接收成功,balancer才会继续进行下一个chunk;不过开发者可以关闭此参数(同{w:1}),同时与write concern一起使用:
Java代码
_waitForDelete表示balancer是否等待source shard删除已经迁移成功的chunk后才继续处理下一个chunk,默认为false,即不等待。
Java代码
4、split
chunk的分裂通常由mongos于shard配合自动完成,不过有些情况下我们可能希望手动split:
Java代码spiltFind语义为:查找符合条件的第一个document所在的chunk,并将此chunk分裂成相同大小的2份。splitAt语义为:查找符合条件的第一个document所在的chunk,并以其为界限分成2个chunk,有可能这两个chunk大小不等。
5、修改chunk size配置
chunkSize默认为64,需要在mongos配置文件中指定,我们也通过指令修改:
Java代码
其他:
1)config数据库详解:https://docs.mongodb.org/manual/reference/config-database/,从中我们可以了解到sharding集群的通讯方式。
2)sharding指令和shell方法参考:https://docs.mongodb.org/manual/reference/sharding/
3)http://www.slideshare.net/deysigmarra/mongo-db-shardingguide
Mongodb中Sharding集群
标签: