当前位置:Gxlcms > mysql > memcached探索之threadmodel(2)

memcached探索之threadmodel(2)

时间:2021-07-01 10:21:17 帮助过:35人阅读

6. 下面我们看看主线程dispatch_thread的事件处理设置。在memcached.c中的main函数中 /* create unix mode sockets after dropping privileges */ if ( settings . socketpath ! = NULL ) { errno = 0 ; if ( server_socket_unix ( settings . socketpath ,

6. 下面我们看看主线程dispatch_thread的事件处理设置。在memcached.c中的main函数中

/* create unix mode sockets after dropping privileges */
if (settings.socketpath != NULL) {
errno = 0;
if (server_socket_unix(settings.socketpath,settings.access)) {
vperror("failed to listen on UNIX socket: %s", settings.socketpath);
exit(EX_OSERR);
}
}

/* create the listening socket, bind it, and init */
if (settings.socketpath == NULL) {
int udp_port;

const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
char temp_portnumber_filename[PATH_MAX];
FILE *portnumber_file = NULL;

if (portnumber_filename != NULL) {
snprintf(temp_portnumber_filename,
sizeof(temp_portnumber_filename),
"%s.lck", portnumber_filename);

portnumber_file = fopen(temp_portnumber_filename, "a");
if (portnumber_file == NULL) {
fprintf(stderr, "Failed to open \"%s\": %s\n",
temp_portnumber_filename, strerror(errno));
}
}

errno = 0;
if (settings.port && server_socket(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}

/*
* initialization order: first create the listening sockets
* (may need root on low ports), then drop root if needed,
* then daemonise if needed, then init libevent (in some cases
* descriptors created by libevent wouldn't survive forking).
*/

udp_port = settings.udpport ? settings.udpport : settings.port;

/* create the UDP listening socket and bind it */
errno = 0;
if (settings.udpport && server_socket(settings.udpport, udp_transport,
portnumber_file)) {
vperror("failed to listen on UDP port %d", settings.udpport);
exit(EX_OSERR);
}

if (portnumber_file) {
fclose(portnumber_file);
rename(temp_portnumber_filename, portnumber_filename);
}
}
/* Drop privileges no longer needed */
drop_privileges();
/* enter the event loop */
event_base_loop(main_base, 0); //主线程(dispatcher_thread)的事件监听循环。。。



7. 继续跟踪server_socket函数(memcached.c中)

/**
* Create a socket and bind it to a specific port number
* @param port the port number to bind to
* @param transport the transport protocol (TCP / UDP)
* @param portnumber_file A filepointer to write the port numbers to
* when they are successfully added to the list of ports we
* listen on.
*/

static int server_socket(int port, enum network_transport transport,
FILE *portnumber_file) {
int sfd;
struct linger ling = {0, 0};
struct addrinfo *ai;
struct addrinfo *next;
struct addrinfo hints = { .ai_flags = AI_PASSIVE,
.ai_family = AF_UNSPEC };
char port_buf[NI_MAXSERV];
int error;
int success = 0;
int flags =1;

hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;

if (port == -1) {
port = 0;
}
snprintf(port_buf, sizeof(port_buf), "%d", port);
error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
if (error != 0) {
if (error != EAI_SYSTEM)
fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
else
perror("getaddrinfo()");
return 1;
}

for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) { //creat socket
/* getaddrinfo can return "junk" addresses,
* we make sure at least one works before erroring.
*/

continue;
}

#ifdef IPV6_V6ONLY
if (next->ai_family == AF_INET6) {
error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
if (error != 0) {
perror("setsockopt");
close(sfd);
continue;
}
}
#endif
//setsockopt
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
if (IS_UDP(transport)) {
maximize_sndbuf(sfd);
} else {
error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");

error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
if (error != 0)
perror("setsockopt");

error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");
}
//bind
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
if (errno != EADDRINUSE) {
perror("bind()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
close(sfd);
continue;
} else {
success++;

//TCP listen
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
perror("listen()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
if (portnumber_file != NULL &&
(next->ai_addr->sa_family == AF_INET ||
next->ai_addr->sa_family == AF_INET6)) {
union {
struct sockaddr_in in;
struct sockaddr_in6 in6;
} my_sockaddr;
socklen_t len = sizeof(my_sockaddr);
if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
if (next->ai_addr->sa_family == AF_INET) {
fprintf(portnumber_file, "%s INET: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in.sin_port));
} else {
fprintf(portnumber_file, "%s INET6: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in6.sin6_port));
}
}
}
}

if (IS_UDP(transport))
{

//UDP 的处理中不需要accept,所以直接派发connection到工作线程。
int c;

for (c = 0; c < settings.num_threads; c++) {
/* this is guaranteed to hit all threads because we round-robin */
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
}
} else
{

//TCP的处理(注意,这里dispatcher_thread同样调用了conn_new来绑定conn_event到其main_base. 并且此时conn的初始状态为conn_listening, 事件为持久可读, 而在conn_new中注册了conn_event的回调函数为event_handler,所以,dispatche_thread在当前listen的socket可读时就会调用event_handler,进而调用driver_machine(c) 进入状态机。而在driver_machine中如果是主线程(dispatcher_thread)则会在accept socket后调用dispatch_new_conn函数来给各worker_thread派发connection...)
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
}

freeaddrinfo(ai);

/* Return zero iff we detected no errors in starting up connections */
return success == 0;
}


8. 看看UDP和TCP模式下dispatcher_thread都会调用的dispatch_new_conn函数(在thread.c中)

/*
* Dispatches a new connection to another thread. This is only ever called
* from the main thread, either during initialization (for UDP) or because
* of an incoming connection.
*/

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
int tid = (last_thread + 1) % settings.num_threads; //轮询的方式找worker_thread

LIBEVENT_THREAD *thread = threads + tid;

last_thread = tid;

item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//push conn到worker_thread的CQ中
cq_push(thread->new_conn_queue, item);

MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
if (write(thread->notify_send_fd, "", 1) != 1) {
perror("Writing to thread notify pipe");
}
}


9. 先看看unix domain socket模式下主线程的事件处理设置。(在上面的6中调用的 server_socket_unix函数

static int server_socket_unix(const char *path, int access_mask) {
int sfd;
struct linger ling = {0, 0};
struct sockaddr_un addr;
struct stat tstat;
int flags =1;
int old_umask;

if (!path) {
return 1;
}

if ((sfd = new_socket_unix()) == -1) {
return 1;
}

/*
* Clean up a previous socket file if we left it around
*/

if (lstat(path, &tstat) == 0) {
if (S_ISSOCK(tstat.st_mode))
unlink(path);
}

setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));

/*
* the memset call clears nonstandard fields in some impementations
* that otherwise mess things up.
*/

memset(&addr, 0, sizeof(addr));

addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
assert(strcmp(addr.sun_path, path) == 0);
old_umask = umask( ~(access_mask&0777));
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("bind()");
close(sfd);
umask(old_umask);
return 1;
}
umask(old_umask);
if (listen(sfd, settings.backlog) == -1) {
perror("listen()");
close(sfd);
return 1;
}
if (!(listen_conn = conn_new(sfd, conn_listening, //同样是调用conn_new
EV_READ | EV_PERSIST, 1,
local_transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}

return 0;
}

人气教程排行