时间:2021-07-01 10:21:17 帮助过:7人阅读
这一节介绍下redis中的多线程机制。 先看看多线程换出的机制。 serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThrea
这一节介绍下redis中的多线程机制。
先看看多线程换出的机制。
serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。
- static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
- iojob *j;
- assert(key->storage == REDIS_VM_MEMORY);
- assert(key->refcount == 1);
- j = zmalloc(sizeof(*j));
- j->type = REDIS_IOJOB_PREPARE_SWAP;
- j->db = db;
- j->key = key;
- j->val = val;
- incrRefCount(val);
- j->canceled = 0;
- j->thread = (pthread_t) -1;
- key->storage = REDIS_VM_SWAPPING;
- lockThreadedIO();
- queueIOJob(j);
- unlockThreadedIO();
- return REDIS_OK;
- }
vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。
- /* This function must be called while with threaded IO locked */
- static void queueIOJob(iojob *j) {
- redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
- (void*)j, j->type, (char*)j->key->ptr);
- listAddNodeTail(server.io_newjobs,j);
- if (server.io_active_threads < server.vm_max_threads)
- spawnIOThread();
- }
从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。
- static void spawnIOThread(void) {
- pthread_t thread;
- sigset_t mask, omask;
- int err;
- sigemptyset(&mask);
- sigaddset(&mask,SIGCHLD);
- sigaddset(&mask,SIGHUP);
- sigaddset(&mask,SIGPIPE);
- pthread_sigmask(SIG_SETMASK, &mask, &omask);
- while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
- redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
- strerror(err));
- usleep(1000000);
- }
- pthread_sigmask(SIG_SETMASK, &omask, NULL);
- server.io_active_threads++;
- }
IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。
- static void *IOThreadEntryPoint(void *arg) {
- iojob *j;
- listNode *ln;
- REDIS_NOTUSED(arg);
- pthread_detach(pthread_self());
- while(1) {
- /* Get a new job to process */
- lockThreadedIO();
- if (listLength(server.io_newjobs) == 0) {
- /* No new jobs in queue, exit. */
- redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
- (long) pthread_self());
- server.io_active_threads--;
- unlockThreadedIO();
- return NULL;
- }
- ln = listFirst(server.io_newjobs);
- j = ln->value;
- listDelNode(server.io_newjobs,ln);
- /* Add the job in the processing queue */
- j->thread = pthread_self();
- listAddNodeTail(server.io_processing,j);
- ln = listLast(server.io_processing); /* We use ln later to remove it */
- unlockThreadedIO();
- redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
- (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
- /* Process the Job */
- if (j->type == REDIS_IOJOB_LOAD) {
- j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
- } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
- FILE *fp = fopen("/dev/null","w+");
- j->pages = rdbSavedObjectPages(j->val,fp);
- fclose(fp);
- } else if (j->type == REDIS_IOJOB_DO_SWAP) {
- if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
- j->canceled = 1;
- }
- /* Done: insert the job into the processed queue */
- redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
- (long) pthread_self(), (void*)j, (char*)j->key->ptr);
- lockThreadedIO();
- listDelNode(server.io_processing,ln);
- listAddNodeTail(server.io_processed,j);
- unlockThreadedIO();
- /* Signal the main thread there is new stuff to process */
- assert(write(server.io_ready_pipe_write,"x",1) == 1);
- }
- return NULL; /* never reached */
- }
- static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
- int mask)
- {
- char buf[1];
- int retval, processed = 0, toprocess = -1, trytoswap = 1;
- REDIS_NOTUSED(el);
- REDIS_NOTUSED(mask);
- REDIS_NOTUSED(privdata);
- if (privdata != NULL) trytoswap = 0; /* check the comments above... */
- /* For every byte we read in the read side of the pipe, there is one
- * I/O job completed to process. */
- while((retval = read(fd,buf,1)) == 1) {
- iojob *j;
- listNode *ln;
- robj *key;
- struct dictEntry *de;
- redisLog(REDIS_DEBUG,"Processing I/O completed job");
- /* Get the processed element (the oldest one) */
- lockThreadedIO();
- assert(listLength(server.io_processed) != 0);
- if (toprocess == -1) {
- toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
- if (toprocess <= 0) toprocess = 1;
- }
- ln = listFirst(server.io_processed);
- j = ln->value;
- listDelNode(server.io_processed,ln);
- unlockThreadedIO();
- /* If this job is marked as canceled, just ignore it */
- if (j->canceled) {
- freeIOJob(j);
- continue;
- }
- /* Post process it in the main thread, as there are things we
- * can do just here to avoid race conditions and/or invasive locks */
- redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
- de = dictFind(j->db->dict,j->key);
- assert(de != NULL);
- key = dictGetEntryKey(de);
- if (j->type == REDIS_IOJOB_LOAD) {
- redisDb *db;
- /* Key loaded, bring it at home */
- key->storage = REDIS_VM_MEMORY;
- key->vm.atime = server.unixtime;
- vmMarkPagesFree(key->vm.page,key->vm.usedpages);
- redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
- (unsigned char*) key->ptr);
- server.vm_stats_swapped_objects--;
- server.vm_stats_swapins++;
- dictGetEntryVal(de) = j->val;
- incrRefCount(j->val);
- db = j->db;
- freeIOJob(j);
- /* Handle clients waiting for this key to be loaded. */
- handleClientsBlockedOnSwappedKey(db,key);
- } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
- /* Now we know the amount of pages required to swap this object.
- * Let's find some space for it, and queue this task again
- * rebranded as REDIS_IOJOB_DO_SWAP. */
- if (!vmCanSwapOut() ||
- vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
- {
- /* Ooops... no space or we can't swap as there is
- * a fork()ed Redis trying to save stuff on disk. */
- freeIOJob(j);
- key->storage = REDIS_VM_MEMORY; /* undo operation */
- } else {
- /* Note that we need to mark this pages as used now,
- * if the job will be canceled, we'll mark them as freed
- * again. */
- vmMarkPagesUsed(j->page,j->pages);
- j->type = REDIS_IOJOB_DO_SWAP;
- lockThreadedIO();
- queueIOJob(j);
- unlockThreadedIO();
- }
- } else if (j->type == REDIS_IOJOB_DO_SWAP) {
- robj *val;
- /* Key swapped. We can finally free some memory. */
- if (key->storage != REDIS_VM_SWAPPING) {
- printf("key->storage: %d\n",key->storage);
- printf("key->name: %s\n",(char*)key->ptr);
- printf("key->refcount: %d\n",key->refcount);
- printf("val: %p\n",(void*)j->val);
- printf("val->type: %d\n",j->val->type);
- printf("val->ptr: %s\n",(char*)j->val->ptr);
- }
- redisAssert(key->storage == REDIS_VM_SWAPPING);
- val = dictGetEntryVal(de);
- key->vm.page = j->page;
- key->vm.usedpages = j->pages;
- key->storage = REDIS_VM_SWAPPED;
- key->vtype = j->val->type;
- decrRefCount(val); /* Deallocate the object from memory. */
- dictGetEntryVal(de) = NULL;
- redisLog(REDIS_DEBUG,
- "VM: object %s swapped out at %lld (%lld pages) (threaded)",
- (unsigned char*) key->ptr,
- (unsigned long long) j->page, (unsigned long long) j->pages);
- server.vm_stats_swapped_objects++;
- server.vm_stats_swapouts++;
- freeIOJob(j);
- /* Put a few more swap requests in queue if we are still
- * out of memory */
- if (trytoswap && vmCanSwapOut() &&
- zmalloc_used_memory() > server.vm_max_memory)
- {
- int more = 1;
- while(more) {
- lockThreadedIO();
- more = listLength(server.io_newjobs) <
- (unsigned) server.vm_max_threads;
- unlockThreadedIO();
- /* Don't waste CPU time if swappable objects are rare. */
- if (vmSwapOneObjectThreaded() == REDIS_ERR) {
- trytoswap = 0;
- break;
- }
- }
- }
- }
- processed++;
- if (processed == toprocess) return;
- }
- if (retval < 0 && errno != EAGAIN) {
- redisLog(REDIS_WARNING,
- "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
- strerror(errno));
- }
- }
原文地址:redis源代码分析25–VM(下), 感谢原作者分享。