在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。 所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的执行过
在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。
所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的执行过程。
1、事件的产生
NIOReactor的R线程一直在监听selector上的每个连接的感兴趣事件是否发生,当客户端发送了一条select * from tb1,select函数会返回,然后获取到该连接SelectionKey,并且该SelectKey的兴趣事件是OP_READ。此时会调用read(NIOConnection)函数。
02 |
final Selector selector = this.selector;
|
06 |
int res = selector.select();
|
07 |
LOGGER.debug(reactCount + ">>NIOReactor接受连接数:" + res);
|
09 |
Set keys = selector.selectedKeys();
|
11 |
for (SelectionKey key : keys) {
|
12 |
Object att = key.attachment();
|
13 |
if (att != null && key.isValid()) {
|
14 |
int readyOps = key.readyOps();
|
15 |
if ((readyOps & SelectionKey.OP_READ) != 0) {
|
16 |
LOGGER.debug("select读事件");
|
17 |
read((NIOConnection) att);
|
18 |
..............................
|
20 |
...........................
|
2、调用该连接的read函数进行处理
该函数在上一篇中提到过,该函数的实现在AbstractConnection中,实现从channel中读取数据到缓冲区,然后从缓冲区完整的取出整包数据交给FrontendConnection类的handle()函数处理。
该函数交给processor进行异步处理。从processor中的线程池获取一个线程来执行该任务。这里调用具体的handler来进行处理。
刚开始提到的,当认证成功后,Cobar将连接的回调处理函数设置为FrontendCommandHandler。所以这里会调用前端命令处理器的handler函数进行数据的处理。
在这里需要先了解MySQL数据包的格式:
MySQL客户端命令请求报文
该处理函数如下:
01 |
public void handle(byte[] data) { |
02 |
LOGGER.info("data[4]:"+data[4]);
|
04 |
case MySQLPacket.COM_INIT_DB:
|
08 |
case MySQLPacket.COM_QUERY:
|
12 |
case MySQLPacket.COM_PING:
|
16 |
case MySQLPacket.COM_QUIT:
|
20 |
case MySQLPacket.COM_PROCESS_KILL:
|
24 |
case MySQLPacket.COM_STMT_PREPARE:
|
25 |
commands.doStmtPrepare();
|
26 |
source.stmtPrepare(data);
|
28 |
case MySQLPacket.COM_STMT_EXECUTE:
|
29 |
commands.doStmtExecute();
|
30 |
source.stmtExecute(data);
|
32 |
case MySQLPacket.COM_STMT_CLOSE:
|
33 |
commands.doStmtClose();
|
34 |
source.stmtClose(data);
|
36 |
case MySQLPacket.COM_HEARTBEAT:
|
37 |
commands.doHeartbeat();
|
38 |
source.heartbeat(data);
|
42 |
source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
|
由于每个报文都有消息头,消息头固定的是4个字节,前3个字节是消息长度,后面的一个字节是报文序号,如下所示
所以data[4]是第五个字节。也就是消息体的第一个字节。客户端向Cobar端发送的是命令报文,第一个字节是具体的命令。
如果是select语句,那么data[4]就是COM_QUERY,然后会调用具体连接的query成员函数,其定义在FrontendConnection类中。
01 |
public void query(byte[] data) { |
02 |
if (queryHandler != null) {
|
04 |
MySQLMessage mm = new MySQLMessage(data);
|
08 |
sql = mm.readString(charset);
|
09 |
} catch (UnsupportedEncodingException e) {
|
10 |
writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
|
13 |
if (sql == null || sql.length() == 0) {
|
14 |
writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");
|
17 |
LOGGER.debug("解析的SQL语句:"+sql);
|
19 |
queryHandler.query(sql);
|
21 |
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");
|
首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的所有的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。
查询的时候Cobar控制台输出如下内容:
11:35:33,392 INFO data[4]:3
解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:
01 |
public void query(String sql) { |
03 |
ServerConnection c = this.source;
|
04 |
if (LOGGER.isDebugEnabled()) {
|
05 |
LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
|
08 |
int rs = ServerParse.parse(sql);
|
10 |
.......................
|
11 |
case ServerParse.SELECT:
|
13 |
SelectHandler.handle(sql, c, rs >>> 8);
|
15 |
.......................
|
首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。
如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql
c.execute(stmt, ServerParse.SELECT);
在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。
2 |
RouteResultset rrs = null; |
4 |
rrs = ServerRouter.route(schema, sql, this.charset, this);
|
5 |
LOGGER.debug("路由计算结果:"+rrs.toString());
|
具体的路由算法也是比较复杂,以后会专门分析。
Cobar的DEBUG控制台输出路由的计算结果如下:
11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={
该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。
经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。
01 |
private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) { |
02 |
ServerConnection sc = ss.getSource();
|
03 |
.........................
|
06 |
BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);
|
08 |
final ReentrantLock lock = MultiNodeExecutor.this.lock;
|
11 |
switch (bin.data[0]) {
|
12 |
case ErrorPacket.FIELD_COUNT:
|
14 |
handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));
|
16 |
case OkPacket.FIELD_COUNT:
|
17 |
OkPacket ok = new OkPacket();
|
19 |
affectedRows += ok.affectedRows;
|
21 |
if (ok.insertId > 0) {
|
22 |
insertId = (insertId == 0) ? ok.insertId : Math.min(insertId, ok.insertId);
|
25 |
handleSuccessOK(ss, rrn, autocommit, ok);
|
28 |
final MySQLChannel mc = (MySQLChannel) c;
|
32 |
switch (bin.data[0]) {
|
33 |
case ErrorPacket.FIELD_COUNT:
|
35 |
handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
|
37 |
case EOFPacket.FIELD_COUNT:
|
38 |
handleRowData(rrn, c, ss);
|
45 |
bin.packetId = ++packetId;
|
46 |
List headerList = new LinkedList();
|
50 |
switch (bin.data[0]) {
|
51 |
case ErrorPacket.FIELD_COUNT:
|
53 |
handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
|
55 |
case EOFPacket.FIELD_COUNT:
|
56 |
bin.packetId = ++packetId;
|
57 |
for (MySQLPacket packet : headerList) {
|
58 |
buffer = packet.write(buffer, sc);
|
61 |
buffer = bin.write(buffer, sc);
|
63 |
handleRowData(rrn, c, ss);
|
66 |
bin.packetId = ++packetId;
|
68 |
case RouteResultset.REWRITE_FIELD:
|
69 |
StringBuilder fieldName = new StringBuilder();
|
70 |
fieldName.append("Tables_in_").append(ss.getSource().getSchema());
|
71 |
FieldPacket field = PacketUtil.getField(bin, fieldName.toString());
|
72 |
headerList.add(field);
|
这里真正的执行SQL语句,然后等待后端执行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。
当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。
响应报文类型 |
第1个字节取值范围 |
OK 响应报文 |
0×00 |
Error 响应报文 |
0xFF |
Result Set 报文 |
0×01 – 0xFA |
Field 报文 |
0×01 – 0xFA |
Row Data 报文 |
0×01 – 0xFA |
EOF 报文 |
0xFE |
注:响应报文的第1个字节在不同类型中含义不同,比如在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。
Result Set 消息分为五部分,结构如下:
结构 |
说明 |
[Result Set Header] |
列数量 |
[Field] |
列信息(多个) |
[EOF] |
列结束 |
[Row Data] |
行数据(多个) |
[EOF] |
数据结束 |
函数执行完成后,返回的结果都放入LinkedList中,当读取结果完成后放入多节点执行器的缓冲区。如果buffer满了,就通过前端连接写出给客户端。