时间:2021-07-01 10:21:17 帮助过:7人阅读
这里可以看到很多的init_*
或者server_init()
。通过名字我们可以猜测出,这里做了很多初始化的工作。例如:启动过程中一些初始化的检查和MySQL配置变量的加载和一些组件的初始化等。
这里重要的函数是handle_connections_sockets
继续跟踪/sql/mysqld.cc
handle_connections_sockets (arg __attribute__((unused))
{
if (ip_sock != INVALID_SOCKET)
{
FD_SET(ip_sock,&clientFDs);
DBUG_PRINT("general",("Waiting for connections."));
while (!abort_loop)
{
new_sock = accept(sock, my_reinterpret_cast(struct sockaddr*)
(&cAddr), &length);
thd= new THD;
if (sock == unix_sock)
thd->host=(char*) localhost;
create_new_thread(thd); // !
}
从简易的思维,忽视其他的判断语句。可以看到这里做的是典型的client/server架构。服务器有一个主线程,它总是侦听来自新客户机的请求。一旦它接收到这样的请求,它将分配资源。特别是,主线程将生成一个新线程来处理连接。然后主服务器将循环并侦听新连接——但我们将保留它并跟踪新线程。
这里创建新线程的方法是:create_new_thread(thd);
继续跟踪/sql/mysqld.cc
create_new_thread(THD *thd)
{
pthread_mutex_lock(&LOCK_thread_count);
pthread_create(&thd->real_id,&connection_attrib,
handle_one_connection, // !
(void*) thd));
pthread_mutex_unlock(&LOCK_thread_count);
}
可以看到这里获得一个新线程加入一个互斥锁,避免冲突。
继续跟踪/sql/mysqld.cc
handle_one_connection(THD *thd)
{
init_sql_alloc(&thd->mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
while (!net->error && net->vio != 0 && !thd->killed)
{
if (do_command(thd)) // !
break;
}
close_connection(net);
end_thread(thd,1);
packet=(char*) net->read_pos;
从这里开始,我们即将脱离mysqld.cc
文件,因为我们获得了thread,且分配一小段内存资源,给与我们来处理我们的SQL语句了。
我们会走向何方呢,可以开始观察do_command(thd)
方法。
继续跟踪/sql/sql_parse.cc
bool do_command(THD *thd)
{
net_new_transaction(net);
packet_length=my_net_read(net);
packet=(char*) net->read_pos;
command = (enum enum_server_command) (uchar) packet[0];
dispatch_command(command,thd, packet+1, (uint) packet_length);
// !
}
其中从这里可以看到,do_command(THD *thd)
把它串联起来的是一个叫作THD的东西,也就是thread。所以后面的工作和行为,基本都是通过thread进行牵线搭桥的。
my_net_read函数位于另一个名为net_servlet .cc的文件中。该函数从客户端获取一个包,解压缩它,并去除头部。
一旦完成,我们就得到了一个名为packet的多字节变量,它包含客户端发送的内容。第一个字节很重要,因为它包含标识消息类型的代码。
说明了packet第一个字节很重要。debug也有证据进行一个佐证。
packet_header: Memory: 0x7f7fc000a4b0 Bytes: (4)
21 00 00 00
然后把packet第一个字节和余下的部分传递给dispatch_command
继续跟踪/sql/sql_parse.cc
bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length)
{
switch (command) {
case COM_INIT_DB: ...
case COM_REGISTER_SLAVE: ...
case COM_TABLE_DUMP: ...
case COM_CHANGE_USER: ...
case COM_EXECUTE:
mysql_stmt_execute(thd,packet);
case COM_LONG_DATA: ...
case COM_PREPARE:
mysql_stmt_prepare(thd, packet, packet_length); // !
/* and so on for 18 other cases */
default:
send_error(thd, ER_UNKNOWN_COM_ERROR);
break;
}
这里sql_parser .cc中有一个非常大的switch语句
switch语句中代码有:code for prepare, close statement, query, quit, create database, drop database, dump binary log, refresh, statistics, get process info, kill process, sleep, connect, and several minor commands
除了COM_EXECUTE和COM_PREPARE两种情况外,我们删除了所有情况下的代码细节。
可以看到
COM_EXECUTE 会调用mysql_stmt_execute(thd,packet);
COM_PREPARE 会调用mysql_stmt_prepare(thd, packet, packet_length);
这里就像一个中转站一般,看我们去向什么地方。这里去的门是:COM_PREPARE:mysql_stmt_prepare
跟踪/sql/sql_prepare.cc
下面是一段prepare的注释
"Prepare:
Parse the query
Allocate a new statement, keep it in ‘thd->prepared statements‘ pool
Return to client the total number of parameters and result-set
metadata information (if any)"
继续回到主线COM_EXECUTE
跟踪/sql/sql_parse.cc
bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length)
{
switch (command) {
case COM_INIT_DB: ...
case COM_REGISTER_SLAVE: ...
case COM_TABLE_DUMP: ...
case COM_CHANGE_USER: ...
case COM_EXECUTE:
mysql_stmt_execute(thd,packet); // !
case COM_LONG_DATA: ...
case COM_PREPARE:
mysql_stmt_prepare(thd, packet, packet_length);
/* and so on for 18 other cases */
default:
send_error(thd, ER_UNKNOWN_COM_ERROR);
break;
}
现在``COM_EXECUTE 中的
mysql_stmt_execute`是我们关注的重点,我们来看看
跟踪/sql/sql_prepare.cc
代码
void mysql_stmt_execute(THD *thd, char *packet)
{
if (!(stmt=find_prepared_statement(thd, stmt_id, "execute")))
{
send_error(thd);
DBUG_VOID_RETURN;
}
init_stmt_execute(stmt);
mysql_execute_command(thd); // !
}
这里做一个判断,看是否是execute
,然后初始化语句,并开始执行mysql_execute_command(thd);
可以看到,是通过thread来调用动作。
跟踪/sql/sql_parse.cc
代码
void mysql_execute_command(THD *thd)
switch (lex->sql_command) {
case SQLCOM_SELECT: ...
case SQLCOM_SHOW_ERRORS: ...
case SQLCOM_CREATE_TABLE: ...
case SQLCOM_UPDATE: ...
case SQLCOM_INSERT: ... // !
case SQLCOM_DELETE: ...
case SQLCOM_DROP_TABLE: ...
}
lex 解析sql语句。然后进入SQLCOM_INSERT。
跟踪/sql/sql_parse.cc
代码
case SQLCOM_INSERT:
{
my_bool update=(lex->value_list.elements ? UPDATE_ACL : 0);
ulong privilege= (lex->duplicates == DUP_REPLACE ?
INSERT_ACL | DELETE_ACL : INSERT_ACL | update);
if (check_access(thd,privilege,tables->db,&tables->grant.privilege))
goto error;
if (grant_option && check_grant(thd,privilege,tables))
goto error;
if (select_lex->item_list.elements != lex->value_list.elements)
{
send_error(thd,ER_WRONG_VALUE_COUNT);
DBUG_VOID_RETURN;
}
res = mysql_insert(thd,tables,lex->field_list,lex->many_values,
select_lex->item_list, lex->value_list,
(update ? DUP_UPDATE : lex->duplicates));
// !
if (thd->net.report_error)
res= -1;
break;
}
对于插入数据,我们要做的第一件事情是:检查用户是否具有对表进行插入的适当特权,服务器通过调用check_access和check_grant函数在这里进行检查。
有了权限才可以做【插入】动作。
我们可以导航 /sql 目录,如下:
Program Name SQL statement type
------------ ------------------
sql_delete.cc DELETE
sql_do.cc DO
sql_handler.cc HANDLER
sql_help.cc HELP
sql_insert.cc INSERT // !
sql_load.cc LOAD
sql_rename.cc RENAME
sql_select.cc SELECT
sql_show.cc SHOW
sql_update.cc UPDATE
sql_insert.cc
是具体执行插入的操作。
上面的mysql_insert() 的方法具体实现,在sql_insert.cc
文件中。
跟踪 /sql/sql_insert.cc
代码
int mysql_insert(THD *thd,TABLE_LIST *table_list, List<Item> &fields,
List<List_item> &values_list,enum_duplicates duplic)
{
table = open_ltable(thd,table_list,lock_type);
if (check_insert_fields(thd,table,fields,*values,1) ||
setup_tables(table_list) ||
setup_fields(thd,table_list,*values,0,0,0))
goto abort;
fill_record(table->field,*values);
error=write_record(table,&info); // !
query_cache_invalidate3(thd, table_list, 1);
if (transactional_table)
error=ha_autocommit_or_rollback(thd,error);
query_cache_invalidate3(thd, table_list, 1);
mysql_unlock_tables(thd, thd->lock);
}
这里就要开始,打开一张表。然后各种检查,看插入表的字段是否有问题。不行就abort。
然后,开始填充记录数据。最终调用write_record 写记录的方法。
由于write_record
会对应不同的存储引擎,所以这里有分支的。我这里讲解两种
继续跟踪/sql/sql_insert.cc
int write_record(TABLE *table,COPY_INFO *info)
{
table->file->write_row(table->record[0]; // !
}
终于,要写文件了。调用那个存储引擎呢?看handler.h
/* The handler for a table type.
Will be included in the TABLE structure */
handler(TABLE *table_arg) :
table(table_arg),active_index(MAX_REF_PARTS),
ref(0),ref_length(sizeof(my_off_t)),
block_size(0),records(0),deleted(0),
data_file_length(0), max_data_file_length(0),
index_file_length(0),
delete_length(0), auto_increment_value(0), raid_type(0),
key_used_on_scan(MAX_KEY),
create_time(0), check_time(0), update_time(0), mean_rec_length(0),
ft_handler(0)
{}
...
virtual int write_row(byte * buf)=0;
官方文档默认调用的是ha_myisam::write_row
代码 /sql/ha_myisam.cc
如下:
int ha_myisam::write_row(byte * buf)
{
statistic_increment(ha_write_count,&LOCK_status);
/* If we have a timestamp column, update it to the current time */
if (table->time_stamp)
update_timestamp(buf+table->time_stamp-1);
/*
If we have an auto_increment column and we are writing a changed row
or a new row, then update the auto_increment value in the record.
*/
if (table->next_number_field && buf == table->record[0])
update_auto_increment();
return mi_write(file,buf); // !
}
这些以字母ha开头的程序是处理程序的接口,而这个程序是myisam处理程序的接口。我们这里就开始调用MyISAM了。
可以看到这里调用了mi_write(file,buf);
跟踪/myisam/mi_write.c
int mi_write(MI_INFO *info, byte *record)
{
_mi_readinfo(info,F_WRLCK,1);
_mi_mark_file_changed(info);
/* Calculate and check all unique constraints */
for (i=0 ; i < share->state.header.uniques ; i++)
{
mi_check_unique(info,share->uniqueinfo+i,record,
mi_unique_hash(share->uniqueinfo+i,record),
HA_OFFSET_ERROR);
}
... to be continued in next snippet
这里有很多唯一性的校验,继续看下面
... continued from previous snippet
/* Write all keys to indextree */
for (i=0 ; i < share->base.keys ; i++)
{
share->keyinfo[i].ck_insert(info,i,buff,
_mi_make_key(info,i,buff,record,filepos)
}
(*share->write_record)(info,record);
if (share->base.auto_key)
update_auto_increment(info,record);
}
这里就是我们写入到文件的地方。至此,MySQL的插入操作结束。
路径为:
main in /sql/mysqld.cc
handle_connections_sockets in /sql/mysqld.cc
create_new_thread in /sql/mysqld.cc
handle_one_connection in /sql/sql_parse.cc
do_command in /sql/sql_parse.cc
dispatch_command in /sql/sql_parse.cc
mysql_stmt_execute in /sql/sql_prepare.cc
mysql_execute_command in /sql/sql_parse.cc
mysql_insert in /sql/mysql_insert.cc
write_record in /sql/mysql_insert.cc
ha_myisam::write_row in /sql/ha_myisam.cc
mi_write in /myisam/mi_write.c
1.进入主函数入口
2.建立socket connection的请求
3.创建一个新的线程
4.处理线程,分配内存资源
5.do_command,是获取packet第一字节,看做什么操作,并接受余下字节。
6.dispatch_command,分发操作,这里分发的是insert。
7.mysql_stmt_execute,检查是否为execute,初始化,准备做execute动作。
8.mysql_execute_command ,lex解析SQL语句,进入到SQLCOM_INSERT
9.mysql_insert ,开始做插入操作。调用write_record
10.write_record,准备写入,看调用哪个存储引擎,写入前期准备工作
11.ha_myisam::write_row,ha_myisam进行插入写入。
12.mi_write,最后做写入操作。
进入MySQL大门的地方。学习开始,JSP语言都是需要main方法作为主入口
标签:头部 pthread 文件的 mysql数据库 资源 ada seve 数据库 hash