当前位置:Gxlcms > 数据库问题 > [LevelDB] 写批处理过程详解

[LevelDB] 写批处理过程详解

时间:2021-07-01 10:21:17 帮助过:13人阅读

  1. <span style="font-size: 14px">1 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  2. 2  // -----A begin-------
  3. 3 Writer w(&mutex_);
  4. 4 w.batch = my_batch;
  5. 5 w.sync = options.sync;
  6. 6 w.done = false;
  7. 7 // -----A end --------
  8. 8
  9. 9
  10. 10 // -----B begin-------
  11. 11 MutexLock l(&mutex_);
  12. 12 writers_.push_back(&w);
  13. 13 while (!w.done && &w != writers_.front()) {
  14. 14 w.cv.Wait();
  15. 15 }
  16. 16 if (w.done) {
  17. 17 return w.status;
  18. 18 }
  19. 19 // -----B end -------
  20. 20
  21. 21 // May temporarily unlock and wait.
  22. 22 Status status = MakeRoomForWrite(my_batch == NULL);
  23. 23 uint64_t last_sequence = versions_->LastSequence();
  24. 24 Writer* last_writer = &w;
  25. 25 if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
  26. 26 WriteBatch* updates = BuildBatchGroup(&last_writer);
  27. 27 WriteBatchInternal::SetSequence(updates, last_sequence + 1);
  28. 28 last_sequence += WriteBatchInternal::Count(updates);
  29. 29
  30. 30 // Add to log and apply to memtable. We can release the lock
  31. 31 // during this phase since &w is currently responsible for logging
  32. 32 // and protects against concurrent loggers and concurrent writes
  33. 33 // into mem_.
  34. 34 {
  35. 35 // -----C begin-------
  36. 36 mutex_.Unlock();
  37. 37 // -----C end -------
  38. 38 status = log_->AddRecord(WriteBatchInternal::Contents(updates));
  39. 39 if (status.ok() && options.sync) {
  40. 40 status = logfile_->Sync();
  41. 41 }
  42. 42 if (status.ok()) {
  43. 43 status = WriteBatchInternal::InsertInto(updates, mem_);
  44. 44 }
  45. 45 // -----D begin-------
  46. 46 mutex_.Lock();
  47. 47 // -----D end -------
  48. 48 }
  49. 49 if (updates == tmp_batch_) tmp_batch_->Clear();
  50. 50
  51. 51 versions_->SetLastSequence(last_sequence);
  52. 52 }
  53. 53
  54. 54 // -----E begin-------
  55. 55 while (true) {
  56. 56 Writer* ready = writers_.front();
  57. 57 writers_.pop_front();
  58. 58 if (ready != &w) {
  59. 59 ready->status = status;
  60. 60 ready->done = true;
  61. 61 ready->cv.Signal();
  62. 62 }
  63. 63 if (ready == last_writer) break;
  64. 64 }
  65. 65 // -----E end -------
  66. 66
  67. 67
  68. 68 // -----F begin-------
  69. 69 // Notify new head of write queue
  70. 70 if (!writers_.empty()) {
  71. 71 writers_.front()->cv.Signal();
  72. 72 }
  73. 73 // -----F end-------
  74. 74
  75. 75 return status;
  76. 76 }</span>

    如上,A段代码定义一个Writer w, w的成员包括了batch信息,同时初始化了一个条件变量成员(port::CondVar)

  假设同时有w1, w2, w3, w4, w5, w6 并发请求写入。

  B段代码让竞争到mutex资源的w1获取了锁。添加到writers队列里去,此时队列只有一个w1, 从而其顺利的进行BuildBatchGroup。当运行到c段代码时,mutex互斥锁释放,这时(w2, w3, w4, w5, w6)会竞争锁,由于B段代码中不满足队首条件,均等待并释放锁了。从而队列可能会如(w3, w5, w2, w4).

  继而w1进行log写入和memtable写入,之所以这里在无锁状况下时安全的,因为其它的写操作都不满足队首条件,进而不会进入log和memtable写入阶段。 当w1完成log和memtable写入后,进入d段代码,则mutex又锁住,这时B段代码中队列因为获取不到锁则队列不会修改。

  进入E段代码后,w1被pop出来,由于reader==w, 并且ready==last_writer,所以直接到F段代码,唤醒了此时处于队首的w3.

  w3唤醒时,发现自己是队首,可以顺利的进行进入BuildBatchGroup,在该函数中,遍历了目前所有的队列元素,形成一个update的batch,即将w3, w5, w2, w4合并为一个batch. 并将last_writer置为此时处于队尾的最后一个元素w4,c段代码运行后,因为释放了锁资源,队列可能随着DBImpl::Write的调用而更改,如队列状况可能为(w3, w5, w2, w4, w6, w9, w8).

  C段和D段间的代码将w3, w5, w2, w4整个的batch写入log和memtable. 到E段时,分别对w5, w2, w4进行了一次cond signal.当判断到完w4 == lastwriter时,则退出E段代码。F段则对队首的w6唤醒,从而按上述步骤依次进行下去。

  这样就形成了多个并发write 合并为一个batch写入log和memtable的机制。

[LevelDB] 写批处理过程详解

标签:bim   his   etl   update   tool   room   option   多个   nts   

人气教程排行