时间:2021-07-01 10:21:17 帮助过:9人阅读
在Redis中,用户可以通过SLAVEOF
命令或是slaveof
选项设置服务器的主从关系,从(SLAVE)服务器会复制主(Master)服务器。
旧版复制功能主要分为两个过程: 同步(SYNC)和命令传播(COMMAND PROPGATE)。
同步过程:
SLAVEOF
命令时,从服务器会向主服务器发送SYNC
命令SYNC
命令后,开始BGSAVE
,生成RDB文件,同时使用一个缓冲区记录从==同步开始时==执行的所有==写命令==BGSAVE
命令,将生成的RDB文件发送给从服务器,从服务器载入这个RDB文件,将数据库状态同步到==主服务器开始同步时==的状态命令传播:
经过同步过程,主从服务器==第一次达到了同步的状态==。但是这状态并非一成不变的,主服务器执行的写命令,仍然会导致主从服务器状态不一致。因此就需要命令传播维持主从一致。
命令传播就是主服务器将自己执行的写命令发送给从服务器执行,保持主从服务器在同步后执行的写命令一致。
对于非初次复制的从服务器而言(从服务器断线后重连),仍然需要再次进行一次同步的过程。由于同步过程涉及RDB文件的生成,RDB文件的发送,在SYNC时需要使用CPU和带宽,代价较大。而对于断线后重连的情况,由于主从服务器在断线前都是一致的状态,因此只需要考虑同步断线到重连这段时间内的数据即可。
新版本采用PSYNC
命令代替SYNC
命令。PSYNC
命令有完整重同步和部分重同步两种模式:
SYNC
一致部分重同步:用于断线后重连的情况,尽量避免再次执行SYNC
的过程。
服务器的运行ID
复制偏移量long long master_repl_offset
:
主从服务器会维持一个自己的复制偏移量,主服务器每次向从服务器传播N个==字节==时,就将自己的复制偏移量增加N。从服务器每次收到主服务器传来的N个字节的数据时,就将自己的复制偏移量增加N。
通过比对主从服务器的复制偏移量,可以判断主从服务器是否一致。
复制积压缓冲区char* repl_backlog
:
复制积压缓冲区是由主服务器维护的一个固定长度long long repl_backlog_size
的先进先出的队列(缓冲区满时,后进的字符将挤出先进的字符)。并且维护了一个缓冲区中的首字符的复制偏移量long long repl_backlog_off
。
当从服务器重连后,通过PSYNC
命令发送自己的复制偏移量,主服务器校验该偏移量后一位(offset + 1
)是否仍在复制积压缓冲区中,如果存在,说明可以通过部分重同步,如果不存在,只能通过完全重同步。
服务器运行IDchar master_replid[CONFIG_RUN_ID_SIZE+1]
:
服务器运行ID是从服务器初次向主服务器复制时,所保存主服务器的运行ID。当从服务器重连时,会向主服务器发送运行时ID,主服务器根据是否是自己的运行时ID,决定执行部分重同步还是完全重同步。
PSYNC
命令格式如下:
PSYNC <runid> <offset>
当从服务器初次复制时,runid
为?
,offset
为-1
。非初次复制时,runid
为上次PSYNC
时传递的运行时ID,offset
为从服务器断线前维护的复制偏移量。
复制的流程主要分为以下步骤:
SLAVEOF
命令: SLAVEOF <master_ip> <master_port>
SLAVEOF
命令后,将命令指定的IP
和端口保存到char *masterhost
和int masterport
中。SLAVEOF
命令是一个异步命令,设置完成后便向客户端返回OK
。IP
和端口,向主服务器创建套接字的链接:PING
命令:PONG
时,才可以继续执行复制工作的下个步骤。masterauth
选项,从服务器会发送AUTH
命令,命令的参数为masterauth
的值,该值将会于主服务器的requirepass
选项比较,校验通过时才可以进行下一步复制。当校验失败,或者是仅有一方设置了相关选项,主服务器都会返回错误。REPLCONF listening-port <port-number>
告诉主服务器监听的端口号,该信息将被保存在redisClient
的int slave_listening_port
中,用来在主服务器执行INFO replication
命令时打印从服务器的端口。PSYNC
命令,执行同步操作,将数据库状态和主服务器保持一致。在这一步,主服务器也会成为从服务器的客户端。因为从服务器需要执行主服务器发送的缓冲区或是挤压缓冲区中的命令。复制过程流程图如下:
在命令传播阶段,从服务器会默认以美妙一次的频率向主服务器发送命令:
REPLCONF ACK <replication_offset>
其中replication_offset
是从服务器维护的复制偏移量。
REPLCONF ACK
命令主要用三个目的:
REPLCONF ACK
命令,那么说明主从服务器间的连接出现问题了。我们可以通过向主服务器发送INFO replication
命令,在列出从服务器列表的lag
一栏中,看到相应从服务器最后一次向主服务器fasongREPLCONF ACK
距现在过了多少秒。min-slaves
选项min-slaves-max-lag
比较的对象就是从客户端的lag
值。RELPCONF ACK
向主服务器发送复制偏移量,那么主服务可以比较二者的偏移量,实现命令补发,达到校准从服务器的目的。Sentinel
(哨兵模式)是Redis高可用性的方案, 一个或多个Sentinel
实例组成Sentinel
系统,负责监视任意多个主服务器及其从服务器,当一个主服务器进入下线状态时,将主服务器的某个从服务器提升为主服务器。
可以通过redis-sentinel /configpath/sentinel.conf
或是redis-server /configpath/sentinel.conf --sentinel
命令来启动。当Sentinel
启动时需要经历以下步骤:
Sentinel
的代码Sentinel
状态Sentinel
实例本质上是一个特殊的Redis
服务器,所以初始化Sentinel
的初始化过程和Redis
服务器的初始化过程很多步骤都是相同的,比如都在main
方法中启动,且会对启动参数进行解析等。Redis
会根据启动命令判断是运行Redis
服务器还是Sentinel
服务器。
Sentinel
的专用代码启动Sentinel
服务器时,Redis
会执行一些不同于Redis
服务器的特殊步骤:
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
initSentinelConfig
会根据Sentinel
的配置设置监听的端口。
initSentinel
中的过程可以分成两部,第一步用Sentinel
专用的命令表替换一般Redis
服务器的命令表。第二步是初始化sentinelState
结构。
Sentinel
状态初始化Sentinel
状态就是初始化sentienlState
这个结构体:
/* Main state. */
struct sentinelState {
//用来记录当前纪元
uint64_t current_epoch; /* Current epoch. */
//记录mater服务器的字段表,其中键是master的名字,value是master对应的sentinelRedisInstance实例
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Last time we ran the time handler. */
list *scripts_queue; /* Queue of user scripts to execute. */
char *announce_ip; /* IP addr that is gossiped to other sentinels if
not NULL. */
int announce_port; /* Port that is gossiped to other sentinels if
non zero. */
} sentinel;
其中sentinelRedisInstance
是sentienl为服务器创建的实例,其可以表示主服务器,从服务器和其他sentinel服务器。
typedef struct sentinelRedisInstance {
//标志位,可以代码不同的实例状态
int flags; /* See SRI_... defines */
//主服务器的名字
char *name; /* Master name from the point of view of this sentinel. */
//该实例对应服务器的运行ID
char *runid; /* run ID of this instance. */
//纪元
uint64_t config_epoch; /* Configuration epoch. */
//主服务器地址
sentinelAddr *addr; /* Master host. */
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
int pending_commands; /* Number of commands sent waiting for a reply. */
mstime_t cc_conn_time; /* cc connection time. */
mstime_t pc_conn_time; /* pc connection time. */
mstime_t pc_last_activity; /* Last time we received any message. */
mstime_t last_avail_time; /* Last time the instance replied to ping with
a reply we consider valid. */
mstime_t last_ping_time; /* Last time a pending ping was sent in the
context of the current command connection
with the instance. 0 if still not sent or
if pong already received. */
mstime_t last_pong_time; /* Last time the instance replied to ping,
whatever the reply was. That's used to check
if the link is idle and must be reconnected. */
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
via Pub/Sub. */
mstime_t last_master_down_reply_time; /* Time of last reply to
SENTINEL is-master-down command. */
mstime_t s_down_since_time; /* Subjectively down since time. */
mstime_t o_down_since_time; /* Objectively down since time. */
mstime_t down_after_period; /* Consider it down after that period. */
mstime_t info_refresh; /* Time at which we received INFO output from it. */
/* Role and the first time we observed it.
* This is useful in order to delay replacing what the instance reports
* with our own configuration. We need to always wait some time in order
* to give a chance to the leader to report the new configuration before
* we do silly things. */
int role_reported;
mstime_t role_reported_time;
mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
/* Master服务器指定字段 */
//sentinel字典,存放其他监视该主服务器的sentinel实例,其中key是ip:port,value是sentinelRedisInstance实例
dict *sentinels; /* Other sentinels monitoring the same master. */
//从服务器字典,存放该主服务器的从服务器实例,其中key是ip:port,value是sentinelRedisInstance实例
dict *slaves; /* Slaves for this master instance. */
//配置的quorum数
unsigned int quorum;/* Number of sentinels that need to agree on failure. */
int parallel_syncs; /* How many slaves to reconfigure at same time. */
char *auth_pass; /* Password to use for AUTH against master & slaves. */
/* Slave服务器指定字段 */
mstime_t master_link_down_time; /* Slave replication link down time. */
//优先级 用于failover时排序
int slave_priority; /* Slave priority according to its INFO output. */
//从INFO命令中解析得到的信息
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
//对应主服务的sentinelRedisInstance实例
struct sentinelRedisInstance *master; /* Master instance if it's slave. */
char *slave_master_host; /* Master host as reported by INFO */
int slave_master_port; /* Master port as reported by INFO */
int slave_master_link_status; /* Master link status as reported by INFO */
//从服务器的复制偏移量
unsigned long long slave_repl_offset; /* Slave replication offset. */
/* Failover */
char *leader; /* If this is a master instance, this is the runid of
the Sentinel that should perform the failover. If
this is a Sentinel, this is the runid of the Sentinel
that this Sentinel voted as leader. */
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
uint64_t failover_epoch; /* Epoch of the currently started failover. */
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
mstime_t failover_state_change_time;
mstime_t failover_start_time; /* Last failover attempt start time. */
mstime_t failover_timeout; /* Max time to refresh failover state. */
mstime_t failover_delay_logged; /* For what failover_start_time value we
logged the failover delay. */
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
} sentinelRedisInstance;
sentinelRedisInstance
结构较为复杂,上文只针对较为重要的属性添加了中文注释,其余一些记录时间的信息基本上是用来判断服务器是否健康,以及在FailOver
时,筛选从服务器用的信息。
在main
函数中的loadConfig
中,会根据sentinel的配置文件所中声明的需要监视的主服务器的列表创建对应的sentinelRedisInstance
实例,并添加至sentinelState
的maters
字典中。
在main
的initServer
中,会注册serverCron
作为时间事件的回调,而serverCron
在sentinel
模式下,会调用sentinel.c/sentinelTimer
,然后调用sentinel.c/sentinelHandleDictOfRedisInstances
:
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
//遍历masters字典上所有的主服务器实例
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
//处理本实例的事情
sentinelHandleRedisInstance(ri);
//如果该实例是主服务器,则递归调用该方法,同样处理主服务器下的从服务器和sentinel
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
//主服务器是否在failOver
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
//进行failOver
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
其中sentinelHandleRedisInstance
是对实例进行的一些定时操作:
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
//负责处理针对实例的连接
sentinelReconnectInstance(ri);
//周期性发送命令
sentinelSendPeriodicCommands(ri);
/* ============== ACTING HALF ============= */
/* We don't proceed with the acting half if we are in TILT mode.
* TILT happens when we find something odd with the time, like a
* sudden change in the clock. */
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
}
/* Every kind of instance */
//检测实例是否客观下线
sentinelCheckSubjectivelyDown(ri);
/* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}
/* Only masters */
//确认是否主观下线,并开始FailOver
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
在sentinelReconnectInstance
函数中,我们可以看到连接建立的过程:
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
//如果已经建立连接,则直接返回
if (!(ri->flags & SRI_DISCONNECTED)) return;
/* Commands connection. */
//建立命令连接(所有的实例都需要进行命令连接)
if (ri->cc == NULL) {
ri->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
if (ri->cc->err) {
sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
ri->cc->errstr);
sentinelKillLink(ri,ri->cc);
} else {
//连接建立成功,并设置响应回调函数
ri->cc_conn_time = mstime();
ri->cc->data = ri;
redisAeAttach(server.el,ri->cc);
redisAsyncSetConnectCallback(ri->cc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(ri->cc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,ri->cc);
sentinelSetClientName(ri,ri->cc,"cmd");
/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
}
}
/* Pub / Sub */
//建立负责Pub/sub的连接(只针对主从服务器)
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
if (ri->pc->err) {
sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
ri->pc->errstr);
sentinelKillLink(ri,ri->pc);
} else {
//建立成功,设置响应的回调
int retval;
ri->pc_conn_time = mstime();
ri->pc->data = ri;
redisAeAttach(server.el,ri->pc);
redisAsyncSetConnectCallback(ri->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(ri->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,ri->pc);
sentinelSetClientName(ri,ri->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
retval = redisAsyncCommand(ri->pc,
sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
if (retval != REDIS_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
sentinelKillLink(ri,ri->pc);
return;
}
}
}
/* Clear the DISCONNECTED flags only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
//更新实例的标识位
if (ri->cc && (ri->flags & SRI_SENTINEL || ri->pc))
ri->flags &= ~SRI_DISCONNECTED;
}
当sentinel
建立了向主服务器的连接后,sentinel
会调用sentinelSendPeriodicCommands
周期性的发送一些命令:
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
//如果未连接,则直接返回
if (ri->flags & SRI_DISCONNECTED) return;
//如果堆积的任务过多,则直接返回
if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
//根据实例的标志位确定INFO发送的周期,sentinel向主从服务器发送的周期为10秒,当主服务器客观下线后,对从服务器发送INFO的周期缩短为1秒
if ((ri->flags & SRI_SLAVE) &&
(ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
/* We ping instances every time the last received pong is older than
* the configured 'down-after-milliseconds' time, but every second
* anyway if 'down-after-milliseconds' is greater than 1 second. */
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
//如果超过INFO发送周期,则发送INFO命令
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
/* Send INFO to masters and slaves, not sentinels. */
//异步发送,并设置收到响应的回调
retval = redisAsyncCommand(ri->cc,
sentinelInfoReplyCallback, NULL, "INFO");
if (retval == REDIS_OK) ri->pending_commands++;
} else if ((now - ri->last_pong_time) > ping_period) {
//如果超过PING发送周期,则发送PING命令
/* Send PING to all the three kinds of instances. */
sentinelSendPing(ri);
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
//如果超过PUB/SUB发送周期,则发布消息
sentinelSendHello(ri);
}
}
在sentinel
会根据主服务器发送回来的INFO
命令响应,解析得到该主服务器的从服务器,该过程由回调函数sentinelInfoReplyCallback
触发,并调用sentinelRefreshInstanceInfo
解析INFO
的响应,如果解析得到的从服务器在主服务器实例的slaves
字典中找不到,那么将为其创建对应实例,并建立网络连接。如果该从服务器已存在,那么更新master
实例中slaves
的信息。
Sentinel
会与所有服务器(不论主从)都建立两个网络连接,一个用来发送命令(INFO,PING和PUB),另一个连接则用来SUB命令。PUB命令会向监视的主服务器的__sentinel__:hello
频道,发送如下格式的消息:
sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch
这条信息包含sentinel
和master
服务器的IP,端口,runid 和 epoch(纪元)。
该消息会被所有监视这个服务器的Sentinel
发现,当sentinel
收到这条消息,会比较是否是自己发的消息,如果不是,则会提取其中sentinel
的信息,比较是否为新发现的sentinel
,如果不是,则更新旧sentinel
对应的实例,如果是,则创建新实例,并建立命令连接,用于之后发送PING
消息。
在Sentinel
模式下,各服务器的关系如下:
当Sentinel
在连续时间内(由配置文件中的down-after-millisends
指定)发送的PING命令所得到的响应均为无效回复(超时或是其他错误回复),那么Sentinel
就会在该实例的flags
中将SRI_S_DOWN
的标识位打开。
当Sentienl
认为一个主服务器主观下线后,会向同样监视该主服务器的Sentienl
发送is-mater-down-by-addr
命令询问:
SENTIENL is-master-down-by-addr <ip> <port> <current_epoch> <runid>
其中前两项参数为master
的ip和port。第三项为sentinel
当前的纪元,第四项在询问主观下线时,为*,而在选举领头Sentinel
时,为sentinel
的runid。
当其他sentinel
收到命令后会回复如下格式的请求:
<down_state>
<leader_runid>
<leader_epoch>
其中第一行别是其认为服务器的状态(1表示下线),第二行和第三行在选举领头Sentinel时分别表示该sentiel
的runid
和纪元。在非选举时,为*
和0。
当sentinel
询问其他sentinel
,且超过quorum的票都是下线时,sentinel
就会将对应master
的flags
的客观下线标记打开(主观下线的标记仍然存在)。
Sentinel的启动过程和各服务器之间的发现过程如下图:
发现主服务器主观下线的Sentinel
会向其他同样监视该主服务器的Sentinel
发起一次领头Sentinel
选举。
选举主要有以下规则:
Sentinel
在每个纪元内可以选一次领头Sentinel
(相当于每一轮都有一次投票权),一轮选举结束,无论成功失败,Sentinel
的纪元都将加1。Sentinel
会通过is-master-down-by-addr
向其他Sentinel
发起领头Sentinel
选举,并推荐自己成为领头Sentinel
。Sentinel
会根据先到先得的原则设置它认为的领头Sentinel
(即如果这一轮中它已经设置其他Sentinel
为leader,那么它无法将这次向它推荐的Sentinel
设置leader)。Sentinel
得到超过半数的支持时,它就成为领头Sentienl
,并负责之后的failOver
工作。Sentinel
,那么将进行下一轮的选举。当选领头的Sentinel
将负责故障转移操作,该操作主要包含三个步骤:
故障转移操作的第一步是在已下线的主服务器的所有从服务器中,挑选出一个状态良好,数据完成的从服务器,并向这个从服务器发送SLAVEOF on one
命令,从而将其转换为主服务器。
挑选的规则是选择在线的,健康的,优先级高的,且复制偏移量最大的从服务器,这样可以保证从服务器的数据最全。
当向从服务器发送的INFO
命令解析得到服务器的role
变为master时,说明从服务器已经升级为主服务器了。
当选举出新主服务器后,领头Sentinel
下一步要做的是让已下线主服务器的从服务器复制新的主服务器。这一步可以通过Sentinel
发送SLAVEOF
命令实现。
最后一步需要做的是将已下线的主服务器成为新主服务器的从服务器。由于旧的主服务器已经下线,因此会在对应的sentinelRedisInstance
保存退化的操作,等到该服务器再次上线时,Sentinel
就会向它发送SLAVEOF
命令,让它成为新主服务器的从服务器。
Redis 集群是 Redis 提供的分布式数据库方案,集群通过分片来进行数据共享,并提供复制和故障转移功能。
一个 Redis 集群通常由多个节点( node )组成。开始时,每个节点互相独立,处于在只包含自己的集群中。我们可以通过以下命令让独立的节点连接,构成一个包含多节点的集群:
CLUSTER MEET <ip> <port>
向某个节点发送CLUSTER MEET
命令,会让该节点向 ip 和 port 指定的节点发送握手。当握手成功时,node
节点会将 ip 和 port 所指定的节点添加到当前所在集群中。
集群中添加节点的过程:
集群模式下的节点本质上还是一个 Redis 服务器,因此启动过程和普通 Redis 服务器一致,都是由main
方法启动,并开始加载,但是一个特殊的操作会在initServer()
中的clusterInit()
中做初始化。
void clusterInit(void) {
int saveconf = 0;
//初始化ClusterState
server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;
server.cluster->state = REDIS_CLUSTER_FAIL;
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list =
dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = REDIS_CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->stats_bus_messages_sent = 0;
server.cluster->stats_bus_messages_received = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();
/* Lock the cluster config file to make sure every node uses
* its own nodes.conf. */
if (clusterLockConfig(server.cluster_configfile) == REDIS_ERR)
exit(1);
/* Load or create a new nodes configuration. */
if (clusterLoadConfig(server.cluster_configfile) == REDIS_ERR) {
/* No configuration found. We will just use the random name provided
* by the createClusterNode() function. */
myself = server.cluster->myself =
createClusterNode(NULL,REDIS_NODE_MYSELF|REDIS_NODE_MASTER);
redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1);
/* We need a listening TCP port for our cluster messaging needs. */
server.cfd_count = 0;
/* Port sanity check II
* The other handshake port check is triggered too late to stop
* us from trying to use a too-high cluster port number. */
if (server.port > (65535-REDIS_CLUSTER_PORT_INCR)) {
redisLog(REDIS_WARNING, "Redis port number too high. "
"Cluster communication port is 10,000 port "
"numbers higher than your Redis port. "
"Your Redis port number must be "
"lower than 55535.");
exit(1);
}
if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == REDIS_ERR)
{
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
redisPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}
/* The slots -> keys map is a sorted set. Init it. */
server.cluster->slots_to_keys = zslCreate();
/* Set myself->port to my listening port, we'll just need to discover
* the IP address via MEET messages. */
myself->port = server.port;
server.cluster->mf_end = 0;
resetManualFailover();
}
该主要作用是初始ClusterState
结构体并创建表示自身节点的ClsuterNode
。
除此之外,在serverCron()
中,集群模式下的 Redis 服务器还有特定周期处理事件clusterCron()
函数。
ClusterState
redisServer
结构中会有一个指向ClusterState
的指针,而ClusterState
结构就是用来记录集群状态的:typedef struct clusterState {
//指向自身节点
clusterNode *myself; /* This node */
//当前纪元
uint64_t currentEpoch;
//集群状态
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
//集群规模,只统计主节点数
int size;
/* Num of master nodes with at least one slot */
//集群中的节点
dict *nodes; /* Hash table of name -> clusterNode structures */
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
//准备迁移的SLOTS
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
//准备导入的SLOTS
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
//记录每个SLOT对应哪个节点
clusterNode *slots[REDIS_CLUSTER_SLOTS];
//借助跳表组织数据库键(分值为键的槽位),可以快速确定某个槽对应哪些键
zskiplist *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a slave is currently not able to
failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The followign fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
} clusterState;
clusterNode
clusterNode
表示集群中的一个节点typedef struct clusterNode {
//创建时间
mstime_t ctime; /* Node object creation time. */
//节点名称
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
//节点标志
int flags; /* REDIS_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
//记录该节点分配到的SLOT
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
//该节点负责的槽
int numslots; /* Number of slots handled by this node */
//该节点的从节点数
int numslaves; /* Number of slave nodes, if this is a master */
//从节点的指针
struct clusterNode **slaves; /* pointers to slave nodes */
//指向主节点
struct clusterNode *slaveof; /* pointer to the master node */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
long long repl_offset; /* Last known repl offset for this node. */
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known port of this node */
//节点的连接
clusterLink *link; /* TCP/IP link with this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
clusterLink
clusterNode
结构中会有一个指向clusterLink
的指针,代表当前节点与该节点间的连接。typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
//TCP Socket的文件描述符
int fd; /* TCP socket file descriptor */
//输入输出缓冲区
sds sndbuf; /* Packet send buffer */
sds rcvbuf; /* Packet reception buffer */
//对应的节点
struct clusterNode *node; /* Node related to this link if any, or NULL */
} clusterLink;
-CLUSTER MEET
的流程
A
发送CLUSTER MEET
消息,A
收到消息后,为新节点B
创建clusterNode
结构,并根据命令指示的ip
和port
向节点B
发送MEET
消息B
收到A
发送的消息后,为A
创建一个clusterNode
的结构,并回复一个PONG
消息作为收到MEET
命令的响应A
收到PONG
消息后,再向B
节点发送PING
消息作为收到PONG
的响应,当B
收到PING
后,确认A
已经收到之前发送的PONG
。TCP
握手过程和CLUSTER MEET
过程对比
让集群删除节点可以通过
CLUSTER FORGET <node_name>
来实现,其中的node_name
可以通过CLUSTER NODES
命令去查询,CLUSTER FORGET
的作用是让执行命令的节点的nodes
中移除对应name
的节点,由于集群中的节点间存在GOSSIP
协议,如果一段周期内,集群中才存在节点没FORGET掉节点,那么被删除的节点仍然会被加入到集群中。除此之外,当被删除的节点重启后,由于该节点的配置中记录了集群的信息,它仍会接入集群,因此需要在再次启动前删除对应的节点配置文件nodes.conf
文件。
Redis 集群通过分片的方式保存数据库中的键值对:整个集群被划分为16384
个槽,集群中的每个节点可以负责处理部分或全部的槽。 当16384
个槽全被节点认领后,集群才处于上线状态(OK
),否则集群处于离线状态(FAIL
)。
我们可以通过CLUSTER INFO
命令,查询集群状态,确认下槽分配完前后的区别:
为节点分配槽可以通过CLUSTER ADDSLOTS <slot> [slot ...]
命令。
上文已经介绍过clusterNode
中会用unsigned char slot[16384/8]
来表示某个节点负责哪些操。虽然该字段为一个字节数组,但是其用每一位表示一个槽,0
表示不负责该槽,1
表示负责该槽。
而numslots
表示该节点负责的槽数量。
节点会通过消息将自己负责的槽信息发送给集群中其他节点。当其它节点收到该消息后,会更新clusterState.nodes
中对应clusterNode
的结构。
clusterState
结构中的slots
数组会记录某个槽负责的节点。
clusterState
用clusterNode slots
数组记录每个槽负责的节点(slot -> node),又会在clusterNode
中用unsigned char slots
数组记录每个节点负责的槽(node -> slot)。这样无论是想确认某个槽对应的节点,还是想确认某个节点处理的槽,时间复杂度都很小。
除此之外,clusterState
还会通过跳表记录键和槽的对应关系。方便快速查询某个槽包含哪些键。
以下图片展示了三主三从情况下,某个主节点clusterNode
和clusterState
的结构:
当集群中的槽都被分配完后,集群就会进入上线状态。此时,客户端就可以向集群发送数据命令了。
CLUSTER KEYSLOT
命令查询)。MOVED
错误,其中MOVE
错误的格式为MOVED <slot> <ip> <port>
。Redis集群的重新分片操作可以将任意数量已分配的给某个节点(源节点)的槽指派给新的节点(目标节点),并且槽中的键值对也会转移的目标节点。重新分片的流程如下:
CLUSTER SETSLOT <slot> IMPORTING <source_id>
,告知目标节点准备导入槽(目标节点的clusterNode
的importing_slots_from[16384]
数组相应槽的位置会指向源节点)CLUSTER SETSLOT <slot> MIGRATING <target_id>
,告知源节点准备迁移槽(源节点的clusterNode
的migrating_slots_to[16384]
数组相应槽的位置会指向目标节点)CLUSTER GETKEYINSLOT <slot> <count>
,逐步获取源节点中该槽存放的键MIGRATE <taget_ip> <target_port> <key_name> 0 <timeout>
,将该键值对迁移至目标节点CLUSTER SETSLOT <slot> NODE <target_id>
命令,说明该槽已经被指派给目标节点重新分片的流程如下:
考虑一种情况,在槽迁移过程中,一部份键仍在源节点,而另一部分键已经在目标节点。那么此时,客户端所要查询的键正好位于这个槽,那么客户端会如何处理?
ASK
错误,将客户端指向目标节点。ASK
错误的客户端先会向目标节点发送ASKING
命令,目标节点就知道客户端是由于ASK
错误而再次发起的查询(因为此时目标节点还不负责该槽,为了避免向目标节点发送MOVED
错误),之后再次向目标节点发送要执行的命令。集群中消息处理的总流程图如下:
集群中一个节点成为另一个节点的从节点可以通过命令CLUSTER REPLICATION <node_id>
实现。
A
发送CLUSTER REPLICATION <node_id>
命令后(其中node_id
为节点B
的ID)A
首先会根据node_id
找出节点B
对应的clusterState
,并将自己节点的slaveof
指向节点B
A
节点会将自己的flags
变成REDIS_NODE_SLAVE
A
节点会根据B
的clusterNode
结构中记录的IP
和端口
向B
发起复制过程A
就成为了B
的从节点,并通知集群中的其他节点集群中的节点会定期向其他节点发送PING
命令,如果在规定时期内没有收到PONG
响应,那么节点就会将接受消息的节点标为疑似下线,同时更新接受节点的flags
字段,打开PFAIL
标志。
在之后集群的消息互发过程中,会向其它节点报告疑似下线的消息,当其他节点收到疑似下线的消息时,会将疑似下线节点的clusterNode
中fail_reports
的链表中添加一个clusterNodeFailReport
节点:
typedef struct clusterNodeFailReport {
//发出疑似下线报告的节点
struct clusterNode *node; /* Node reporting the failure condition. */
//创建时间
mstime_t time; /* Time of the last report from this node. */
} clusterNodeFailReport;
当集群中半数以上负责处理槽的主节点都报告某个节点意思下线(clusterLink *link
的size
> clusterState.size / 2
),那么该节点就成为已下线状态(FAIL
)。发现某个节点下线的节点会向集群中广播一条节点下线的消息。
集群下的故障转移过程和Sentinel
下的故障转移过程相似:
SLAVEOF no one
,成为主节点上面介绍的集群中的节点会通过各种消息交换信息(比如PING
,PONG
等),这里说的PING
和PONG
并非客户端发送的PING
命令。
消息结构的定义如下:
typedef struct {
char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */
//消息总长度
uint32_t totlen; /* Total length of this message */
//协议版本
uint16_t ver; /* Protocol version, currently set to 0. */
uint16_t notused0; /* 2 bytes not used. */
//消息类型
uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */
//当前纪元
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last
epoch advertised by its master if it is a
slave. */
//复制偏移量
uint64_t offset; /* Master replication offset if node is a master or
processed replication offset if node is a slave. */
//集群的名字
char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */
//节点负责的槽
unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
char slaveof[REDIS_CLUSTER_NAMELEN];
char notused1[32]; /* 32 bytes reserved for future usage. */
//TCP的端口
uint16_t port; /* Sender TCP base port */
//节点的标志
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
//消息体
union clusterMsgData data;
} clusterMsg;
我们可以看到上述结构中包含了节点的许多信息,比如负责处理的槽,节点复制偏移量,节点的标识位等等。因此集群中的节点能互相交换信息。而clusterMsgData
表示某个具体类型的消息。
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
};
消息主要分为五种类型:
MEET
消息:节点收到CLUSTER MEET
消息后,会向指定的消息发送MEET
命令,请求接收者加入集群PING
消息:节点随机选择五个节点并挑选出一个最长时间未发送节点发送PING
消息,除此之外,如果最后一次收到节点的PONG
消息超过了cluster-node-timeout
的一半,也会发送PING
消息PONG
消息:当节点收到PING
或是MEET
时,会发送PONG
消息作为响应。或是某个从节点成为了主节点后,会发送一次PONG
消息FAIL
消息:一个节点判断另一个节点已下线时,会向集群广播一个FAIL
消息PUBLISH
消息:一个节点收到PUBLISH
命令后,会向集群广播PUBLISH
消息,让集群中的其它节点同样执行一次PUBLISH
命令。MEET
,PING
和PONG
消息的实现MEET
,PING
和PONG
都通过gossip
协议来交换信息。gossip
协议通过点对点的信息交互实现集群中的信息同步。
MEET
,PING
和PONG
消息体都带有一个长度为 2 的clusterMsgDataGossip
的数组。
其中clusterMsgDataGossip
结构的定义如下:
typedef struct {
//节点名称
char nodename[REDIS_CLUSTER_NAMELEN];
//发送PING命令的时间
uint32_t ping_sent;
//收到PONG命令的时间
uint32_t pong_received;
//IP地址
char ip[REDIS_IP_STR_LEN]; /* IP address last time it was seen */
//端口
uint16_t port; /* port last time it was seen */
//标志位
uint16_t flags; /* node->flags copy */
uint16_t notused1; /* Some room for future improvements. */
uint32_t notused2;
} clusterMsgDataGossip;
当集群发送PING
,PONG
和MEET
消息时,会从clusterNode
的字典中随机挑选两个节点,并将其信息保存到clusterMsgDataGossip
中。这样就能通过消息交换节点信息。
接收到消息的节点会解析clusterMsgDataGossip
中的节点信息,如果是未发现的节点则建立连接,如果是已发送的节点,则同步信息。
FAIL
消息的实现FAIL
消息通过clusterMsgDataFail
结构记录消息节点的名字。
typedef struct {
//下线节点的name
char nodename[REDIS_CLUSTER_NAMELEN];
} clusterMsgDataFail;
PUBLISH
消息的实现PUBLISH
消息通过clusterMsgDataPublish
来记录一个PUBLISH
命令的channel
和message
。
typedef struct {
//channel字符串的长度
uint32_t channel_len;
//message字符串的长度
uint32_t message_len;
/* We can't reclare bulk_data as bulk_data[] since this structure is
* nested. The 8 bytes are removed from the count during the message
* length computation. */
//字符串数组,保存了channel和message
unsigned char bulk_data[8];
} clusterMsgDataPublish;
Redis(三):多机数据库的实现
标签:skin mmu 不一致 初始 建立连接 something 链表 分布式数据库 wait