当前位置:Gxlcms > mysql > redis源代码分析25–VM(下)

redis源代码分析25–VM(下)

时间:2021-07-01 10:21:17 帮助过:7人阅读

这一节介绍下redis中的多线程机制。 先看看多线程换出的机制。 serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThrea

这一节介绍下redis中的多线程机制。

先看看多线程换出的机制。

serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。

  1. static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
  2. iojob *j;
  3. assert(key->storage == REDIS_VM_MEMORY);
  4. assert(key->refcount == 1);
  5. j = zmalloc(sizeof(*j));
  6. j->type = REDIS_IOJOB_PREPARE_SWAP;
  7. j->db = db;
  8. j->key = key;
  9. j->val = val;
  10. incrRefCount(val);
  11. j->canceled = 0;
  12. j->thread = (pthread_t) -1;
  13. key->storage = REDIS_VM_SWAPPING;
  14. lockThreadedIO();
  15. queueIOJob(j);
  16. unlockThreadedIO();
  17. return REDIS_OK;
  18. }

vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。

  1. /* This function must be called while with threaded IO locked */
  2. static void queueIOJob(iojob *j) {
  3. redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
  4. (void*)j, j->type, (char*)j->key->ptr);
  5. listAddNodeTail(server.io_newjobs,j);
  6. if (server.io_active_threads < server.vm_max_threads)
  7. spawnIOThread();
  8. }

从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。

  1. static void spawnIOThread(void) {
  2. pthread_t thread;
  3. sigset_t mask, omask;
  4. int err;
  5. sigemptyset(&mask);
  6. sigaddset(&mask,SIGCHLD);
  7. sigaddset(&mask,SIGHUP);
  8. sigaddset(&mask,SIGPIPE);
  9. pthread_sigmask(SIG_SETMASK, &mask, &omask);
  10. while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
  11. redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
  12. strerror(err));
  13. usleep(1000000);
  14. }
  15. pthread_sigmask(SIG_SETMASK, &omask, NULL);
  16. server.io_active_threads++;
  17. }

IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。

  1. static void *IOThreadEntryPoint(void *arg) {
  2. iojob *j;
  3. listNode *ln;
  4. REDIS_NOTUSED(arg);
  5. pthread_detach(pthread_self());
  6. while(1) {
  7. /* Get a new job to process */
  8. lockThreadedIO();
  9. if (listLength(server.io_newjobs) == 0) {
  10. /* No new jobs in queue, exit. */
  11. redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
  12. (long) pthread_self());
  13. server.io_active_threads--;
  14. unlockThreadedIO();
  15. return NULL;
  16. }
  17. ln = listFirst(server.io_newjobs);
  18. j = ln->value;
  19. listDelNode(server.io_newjobs,ln);
  20. /* Add the job in the processing queue */
  21. j->thread = pthread_self();
  22. listAddNodeTail(server.io_processing,j);
  23. ln = listLast(server.io_processing); /* We use ln later to remove it */
  24. unlockThreadedIO();
  25. redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
  26. (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
  27. /* Process the Job */
  28. if (j->type == REDIS_IOJOB_LOAD) {
  29. j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
  30. } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
  31. FILE *fp = fopen("/dev/null","w+");
  32. j->pages = rdbSavedObjectPages(j->val,fp);
  33. fclose(fp);
  34. } else if (j->type == REDIS_IOJOB_DO_SWAP) {
  35. if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
  36. j->canceled = 1;
  37. }
  38. /* Done: insert the job into the processed queue */
  39. redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
  40. (long) pthread_self(), (void*)j, (char*)j->key->ptr);
  41. lockThreadedIO();
  42. listDelNode(server.io_processing,ln);
  43. listAddNodeTail(server.io_processed,j);
  44. unlockThreadedIO();
  45. /* Signal the main thread there is new stuff to process */
  46. assert(write(server.io_ready_pipe_write,"x",1) == 1);
  47. }
  48. return NULL; /* never reached */
  49. }
  50. static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
  51. int mask)
  52. {
  53. char buf[1];
  54. int retval, processed = 0, toprocess = -1, trytoswap = 1;
  55. REDIS_NOTUSED(el);
  56. REDIS_NOTUSED(mask);
  57. REDIS_NOTUSED(privdata);
  58. if (privdata != NULL) trytoswap = 0; /* check the comments above... */
  59. /* For every byte we read in the read side of the pipe, there is one
  60. * I/O job completed to process. */
  61. while((retval = read(fd,buf,1)) == 1) {
  62. iojob *j;
  63. listNode *ln;
  64. robj *key;
  65. struct dictEntry *de;
  66. redisLog(REDIS_DEBUG,"Processing I/O completed job");
  67. /* Get the processed element (the oldest one) */
  68. lockThreadedIO();
  69. assert(listLength(server.io_processed) != 0);
  70. if (toprocess == -1) {
  71. toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
  72. if (toprocess <= 0) toprocess = 1;
  73. }
  74. ln = listFirst(server.io_processed);
  75. j = ln->value;
  76. listDelNode(server.io_processed,ln);
  77. unlockThreadedIO();
  78. /* If this job is marked as canceled, just ignore it */
  79. if (j->canceled) {
  80. freeIOJob(j);
  81. continue;
  82. }
  83. /* Post process it in the main thread, as there are things we
  84. * can do just here to avoid race conditions and/or invasive locks */
  85. redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
  86. de = dictFind(j->db->dict,j->key);
  87. assert(de != NULL);
  88. key = dictGetEntryKey(de);
  89. if (j->type == REDIS_IOJOB_LOAD) {
  90. redisDb *db;
  91. /* Key loaded, bring it at home */
  92. key->storage = REDIS_VM_MEMORY;
  93. key->vm.atime = server.unixtime;
  94. vmMarkPagesFree(key->vm.page,key->vm.usedpages);
  95. redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
  96. (unsigned char*) key->ptr);
  97. server.vm_stats_swapped_objects--;
  98. server.vm_stats_swapins++;
  99. dictGetEntryVal(de) = j->val;
  100. incrRefCount(j->val);
  101. db = j->db;
  102. freeIOJob(j);
  103. /* Handle clients waiting for this key to be loaded. */
  104. handleClientsBlockedOnSwappedKey(db,key);
  105. } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
  106. /* Now we know the amount of pages required to swap this object.
  107. * Let's find some space for it, and queue this task again
  108. * rebranded as REDIS_IOJOB_DO_SWAP. */
  109. if (!vmCanSwapOut() ||
  110. vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
  111. {
  112. /* Ooops... no space or we can't swap as there is
  113. * a fork()ed Redis trying to save stuff on disk. */
  114. freeIOJob(j);
  115. key->storage = REDIS_VM_MEMORY; /* undo operation */
  116. } else {
  117. /* Note that we need to mark this pages as used now,
  118. * if the job will be canceled, we'll mark them as freed
  119. * again. */
  120. vmMarkPagesUsed(j->page,j->pages);
  121. j->type = REDIS_IOJOB_DO_SWAP;
  122. lockThreadedIO();
  123. queueIOJob(j);
  124. unlockThreadedIO();
  125. }
  126. } else if (j->type == REDIS_IOJOB_DO_SWAP) {
  127. robj *val;
  128. /* Key swapped. We can finally free some memory. */
  129. if (key->storage != REDIS_VM_SWAPPING) {
  130. printf("key->storage: %d\n",key->storage);
  131. printf("key->name: %s\n",(char*)key->ptr);
  132. printf("key->refcount: %d\n",key->refcount);
  133. printf("val: %p\n",(void*)j->val);
  134. printf("val->type: %d\n",j->val->type);
  135. printf("val->ptr: %s\n",(char*)j->val->ptr);
  136. }
  137. redisAssert(key->storage == REDIS_VM_SWAPPING);
  138. val = dictGetEntryVal(de);
  139. key->vm.page = j->page;
  140. key->vm.usedpages = j->pages;
  141. key->storage = REDIS_VM_SWAPPED;
  142. key->vtype = j->val->type;
  143. decrRefCount(val); /* Deallocate the object from memory. */
  144. dictGetEntryVal(de) = NULL;
  145. redisLog(REDIS_DEBUG,
  146. "VM: object %s swapped out at %lld (%lld pages) (threaded)",
  147. (unsigned char*) key->ptr,
  148. (unsigned long long) j->page, (unsigned long long) j->pages);
  149. server.vm_stats_swapped_objects++;
  150. server.vm_stats_swapouts++;
  151. freeIOJob(j);
  152. /* Put a few more swap requests in queue if we are still
  153. * out of memory */
  154. if (trytoswap && vmCanSwapOut() &&
  155. zmalloc_used_memory() > server.vm_max_memory)
  156. {
  157. int more = 1;
  158. while(more) {
  159. lockThreadedIO();
  160. more = listLength(server.io_newjobs) <
  161. (unsigned) server.vm_max_threads;
  162. unlockThreadedIO();
  163. /* Don't waste CPU time if swappable objects are rare. */
  164. if (vmSwapOneObjectThreaded() == REDIS_ERR) {
  165. trytoswap = 0;
  166. break;
  167. }
  168. }
  169. }
  170. }
  171. processed++;
  172. if (processed == toprocess) return;
  173. }
  174. if (retval < 0 && errno != EAGAIN) {
  175. redisLog(REDIS_WARNING,
  176. "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
  177. strerror(errno));
  178. }
  179. }

人气教程排行