当前位置:Gxlcms > 数据库问题 > LevelDB的源码阅读(四) Compaction操作

LevelDB的源码阅读(四) Compaction操作

时间:2021-07-01 10:21:17 帮助过:20人阅读

Leveldb的数据文件是按层存放的,默认配置的最高层级是7,即level0,level1,…,level7.内存中的immutable总是写入level0,除level0之外的各个层leveli的所有数据文件的key范围都是互相不相交的.当满足一定条件时,leveli的数据文件会和leveli+1的数据文件进行merge,产生新的leveli+1层级的文件,这个磁盘文件的merge过程和immutable的dump过程叫做Compaction,在leveldb中是由一个单独的后台线程来完成的.

进行Compaction操作的条件如下:

1.产生了新的immutable table需要写入数据文件

2.某个level的数据规模过大

3.某个文件被无效查询的次数过多(在文件i中查询key,没有找到key,这次查询称为文件i的无效查询)

4.手动compaction

满足以上条件会启动Compaction过程,接下来分析详细的Compaction过程.

 Leveldb进行Compaction的入口函数是db文件夹下db_impl.cc文件中的DBImpl::MaybeScheduleCompaction,该函数在每次leveldb进行读写操作时都有可能被调用.源码内容如下:

  1. <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::MaybeScheduleCompaction() {
  2. mutex_.AssertHeld();
  3. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (bg_compaction_scheduled_) {
  4. </span><span style="color: #008000">//</span><span style="color: #008000"> Already scheduled</span>
  5. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span><span style="color: #000000"> (shutting_down_.Acquire_Load()) {
  6. </span><span style="color: #008000">//</span><span style="color: #008000"> DB is being deleted; no more background compactions</span>
  7. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!<span style="color: #000000">bg_error_.ok()) {
  8. </span><span style="color: #008000">//</span><span style="color: #008000"> Already got an error; no more changes</span>
  9. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (imm_ == NULL &&<span style="color: #000000">
  10. manual_compaction_ </span>== NULL &&
  11. !versions_-><span style="color: #000000">NeedsCompaction()) {
  12. </span><span style="color: #008000">//</span><span style="color: #008000"> No work to be done</span>
  13. } <span style="color: #0000ff">else</span><span style="color: #000000"> {
  14. bg_compaction_scheduled_ </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
  15. env_</span>->Schedule(&DBImpl::BGWork, <span style="color: #0000ff">this</span>); <span style="color: #008000">//</span><span style="color: #008000">新建后台任务并进行调度</span>
  16. <span style="color: #000000"> }
  17. }</span>

首先调用db文件夹下version_set.h中的NeedsCompaction()判断是否需要启动Compact任务.源码内容如下:

  1. <span style="color: #008000">//</span><span style="color: #008000"> Returns true iff some level needs a compaction.</span>
  2. <span style="color: #0000ff">bool</span> NeedsCompaction() <span style="color: #0000ff">const</span><span style="color: #000000"> {
  3. Version</span>* v =<span style="color: #000000"> current_;
  4. </span><span style="color: #0000ff">return</span> (v->compaction_score_ >= <span style="color: #800080">1</span>) || (v->file_to_compact_ !=<span style="color: #000000"> NULL);
  5. }</span>

version_set.cc中compaction_score_ 的计算如下:

  1. <span style="color: #0000ff">void</span> VersionSet::Finalize(Version*<span style="color: #000000"> v) {
  2. </span><span style="color: #008000">//</span><span style="color: #008000"> Precomputed best level for next compaction</span>
  3. <span style="color: #0000ff">int</span> best_level = -<span style="color: #800080">1</span><span style="color: #000000">;
  4. </span><span style="color: #0000ff">double</span> best_score = -<span style="color: #800080">1</span><span style="color: #000000">;
  5. </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> level = <span style="color: #800080">0</span>; level < config::kNumLevels-<span style="color: #800080">1</span>; level++<span style="color: #000000">) {
  6. </span><span style="color: #0000ff">double</span><span style="color: #000000"> score;
  7. </span><span style="color: #0000ff">if</span> (level == <span style="color: #800080">0</span><span style="color: #000000">) {
  8. </span><span style="color: #008000">//</span><span style="color: #008000"> We treat level-0 specially by bounding the number of files
  9. </span><span style="color: #008000">//</span><span style="color: #008000"> instead of number of bytes for two reasons:
  10. </span><span style="color: #008000">//</span>
  11. <span style="color: #008000">//</span><span style="color: #008000"> (1) With larger write-buffer sizes, it is nice not to do too
  12. </span><span style="color: #008000">//</span><span style="color: #008000"> many level-0 compactions.
  13. </span><span style="color: #008000">//</span>
  14. <span style="color: #008000">//</span><span style="color: #008000"> (2) The files in level-0 are merged on every read and
  15. </span><span style="color: #008000">//</span><span style="color: #008000"> therefore we wish to avoid too many files when the individual
  16. </span><span style="color: #008000">//</span><span style="color: #008000"> file size is small (perhaps because of a small write-buffer
  17. </span><span style="color: #008000">//</span><span style="color: #008000"> setting, or very high compression ratios, or lots of
  18. </span><span style="color: #008000">//</span><span style="color: #008000"> overwrites/deletions).</span>
  19. score = v->files_[level].size() /<span style="color: #000000">
  20. static_cast</span><<span style="color: #0000ff">double</span>><span style="color: #000000">(config::kL0_CompactionTrigger);
  21. } </span><span style="color: #0000ff">else</span><span style="color: #000000"> {
  22. </span><span style="color: #008000">//</span><span style="color: #008000"> Compute the ratio of current size to size limit.</span>
  23. <span style="color: #0000ff">const</span> uint64_t level_bytes = TotalFileSize(v-><span style="color: #000000">files_[level]);
  24. score </span>= static_cast<<span style="color: #0000ff">double</span>>(level_bytes) /<span style="color: #000000"> MaxBytesForLevel(level);
  25. }
  26. </span><span style="color: #0000ff">if</span> (score ><span style="color: #000000"> best_score) {
  27. best_level </span>=<span style="color: #000000"> level;
  28. best_score </span>=<span style="color: #000000"> score;
  29. }
  30. }
  31. v</span>->compaction_level_ =<span style="color: #000000"> best_level;
  32. v</span>->compaction_score_ =<span style="color: #000000"> best_score;
  33. }</span>

注意,这里同时预计算了进行compaction的最佳level.

确认需要启动compaction之后,调用util文件夹下env_posix.cc文件中的PosixEnv::Schedule函数启动Compact过程.

  1. <span style="color: #0000ff">void</span> PosixEnv::Schedule(<span style="color: #0000ff">void</span> (*function)(<span style="color: #0000ff">void</span>*), <span style="color: #0000ff">void</span>*<span style="color: #000000"> arg) {
  2. PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">lock</span><span style="color: #800000">"</span>, pthread_mutex_lock(&<span style="color: #000000">mu_));
  3. </span><span style="color: #008000">//</span><span style="color: #008000"> Start background thread if necessary</span>
  4. <span style="color: #0000ff">if</span> (!<span style="color: #000000">started_bgthread_) {
  5. started_bgthread_ </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
  6. PthreadCall(
  7. </span><span style="color: #800000">"</span><span style="color: #800000">create thread</span><span style="color: #800000">"</span><span style="color: #000000">,
  8. pthread_create(</span>&bgthread_, NULL, &PosixEnv::BGThreadWrapper, <span style="color: #0000ff">this</span><span style="color: #000000">));
  9. }
  10. </span><span style="color: #008000">//</span><span style="color: #008000"> If the queue is currently empty, the background thread may currently be
  11. </span><span style="color: #008000">//</span><span style="color: #008000"> waiting.</span>
  12. <span style="color: #0000ff">if</span><span style="color: #000000"> (queue_.empty()) {
  13. PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">signal</span><span style="color: #800000">"</span>, pthread_cond_signal(&<span style="color: #000000">bgsignal_));
  14. }
  15. </span><span style="color: #008000">//</span><span style="color: #008000"> Add to priority queue</span>
  16. <span style="color: #000000"> queue_.push_back(BGItem());
  17. queue_.back().function </span>=<span style="color: #000000"> function;
  18. queue_.back().arg </span>=<span style="color: #000000"> arg;
  19. PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">unlock</span><span style="color: #800000">"</span>, pthread_mutex_unlock(&<span style="color: #000000">mu_));
  20. }</span>

 如果没有后台线程,则创建后台线程,否则新建一个后台执行任务BGItem压入后台线程任务队列,然后调用PosixEnv::BGThreadWrapper唤醒后台线程: 

  1. <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span>* BGThreadWrapper(<span style="color: #0000ff">void</span>*<span style="color: #000000"> arg) {
  2. reinterpret_cast</span><PosixEnv*>(arg)-><span style="color: #000000">BGThread();
  3. </span><span style="color: #0000ff">return</span><span style="color: #000000"> NULL;
  4. }</span>

 BGThreadWrapper调用PosixEnv::BGThread,不断地从后台任务队列中拿到任务,然后执行任务

  1. <span style="color: #0000ff">void</span><span style="color: #000000"> PosixEnv::BGThread() {
  2. </span><span style="color: #0000ff">while</span> (<span style="color: #0000ff">true</span><span style="color: #000000">) {
  3. </span><span style="color: #008000">//</span><span style="color: #008000"> Wait until there is an item that is ready to run</span>
  4. PthreadCall(<span style="color: #800000">"</span><span style="color: #800000">lock</span><span style="color: #800000">"</span>, pthread_mutex_lock(&<span style="color: #000000">mu_));
  5. </span><span style="color: #0000ff">while</span><span style="color: #000000"> (queue_.empty()) {
  6. PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">wait</span><span style="color: #800000">"</span>, pthread_cond_wait(&bgsignal_, &<span style="color: #000000">mu_));
  7. }
  8. </span><span style="color: #0000ff">void</span> (*function)(<span style="color: #0000ff">void</span>*) =<span style="color: #000000"> queue_.front().function;
  9. </span><span style="color: #0000ff">void</span>* arg =<span style="color: #000000"> queue_.front().arg;
  10. queue_.pop_front();
  11. PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">unlock</span><span style="color: #800000">"</span>, pthread_mutex_unlock(&<span style="color: #000000">mu_));
  12. (</span>*<span style="color: #000000">function)(arg);
  13. }
  14. }</span>

回到DBImpl::MaybeScheduleCompaction,方便理解起见这里再重复一遍源码

  1. <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::MaybeScheduleCompaction() {
  2. mutex_.AssertHeld();
  3. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (bg_compaction_scheduled_) {
  4. </span><span style="color: #008000">//</span><span style="color: #008000"> Already scheduled</span>
  5. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span><span style="color: #000000"> (shutting_down_.Acquire_Load()) {
  6. </span><span style="color: #008000">//</span><span style="color: #008000"> DB is being deleted; no more background compactions</span>
  7. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!<span style="color: #000000">bg_error_.ok()) {
  8. </span><span style="color: #008000">//</span><span style="color: #008000"> Already got an error; no more changes</span>
  9. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (imm_ == NULL &&<span style="color: #000000">
  10. manual_compaction_ </span>== NULL &&
  11. !versions_-><span style="color: #000000">NeedsCompaction()) {
  12. </span><span style="color: #008000">//</span><span style="color: #008000"> No work to be done</span>
  13. } <span style="color: #0000ff">else</span><span style="color: #000000"> {
  14. bg_compaction_scheduled_ </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
  15. env_</span>->Schedule(&DBImpl::BGWork, <span style="color: #0000ff">this</span>); <span style="color: #008000">//</span><span style="color: #008000">新建后台任务并进行调度</span>
  16. <span style="color: #000000"> }
  17. }</span>

之前分析了env_->Schedule进行的调度过程,现在来分析实际进行后台任务的DBImpl::BGWork.DBImpl::BGWork在db文件夹下db_impl.cc文件中.

  1. <span style="color: #0000ff">void</span> DBImpl::BGWork(<span style="color: #0000ff">void</span>*<span style="color: #000000"> db) {
  2. reinterpret_cast</span><DBImpl*>(db)-><span style="color: #000000">BackgroundCall();
  3. }</span>

DBImpl::BGWork调用DBImpl::BackgroundCall(),合并完成后可能导致有的level的文件数过多,因此会再次调用MaybeScheduleCompaction()判断是否需要继续进行合并.

  1. <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::BackgroundCall() {
  2. MutexLock l(</span>&<span style="color: #000000">mutex_);
  3. assert(bg_compaction_scheduled_);
  4. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (shutting_down_.Acquire_Load()) {
  5. </span><span style="color: #008000">//</span><span style="color: #008000"> No more background work when shutting down.</span>
  6. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!<span style="color: #000000">bg_error_.ok()) {
  7. </span><span style="color: #008000">//</span><span style="color: #008000"> No more background work after a background error.</span>
  8. } <span style="color: #0000ff">else</span><span style="color: #000000"> {
  9. BackgroundCompaction();
  10. }
  11. bg_compaction_scheduled_ </span>= <span style="color: #0000ff">false</span><span style="color: #000000">;
  12. </span><span style="color: #008000">//</span><span style="color: #008000"> Previous compaction may have produced too many files in a level,
  13. </span><span style="color: #008000">//</span><span style="color: #008000"> so reschedule another compaction if needed.</span>
  14. <span style="color: #000000"> MaybeScheduleCompaction();
  15. bg_cv_.SignalAll();
  16. }</span>

DBImpl::BackgroundCall()调用 BackgroundCompaction(),在BackgroundCompaction()中分别完成三种不同的Compaction:对Memtable进行合并、 trivial Compaction(直接将文件移动到下一层)以及一般的合并,调用DoCompactionWork()实现.

  1. <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::BackgroundCompaction() {
  2. mutex_.AssertHeld();
  3. </span><span style="color: #0000ff">if</span> (imm_ !=<span style="color: #000000"> NULL) {
  4. CompactMemTable();</span><span style="color: #008000">//</span><span style="color: #008000">1、对Memtable进行合并 </span>
  5. <span style="color: #0000ff">return</span><span style="color: #000000">;
  6. }
  7. Compaction</span>*<span style="color: #000000"> c;
  8. </span><span style="color: #0000ff">bool</span> is_manual = (manual_compaction_ != NULL);<span style="color: #008000">//</span><span style="color: #008000">manual_compaction默认为NULL,则is_manual默认为false </span>
  9. <span style="color: #000000"> InternalKey manual_end;
  10. </span><span style="color: #0000ff">if</span> (is_manual) { <span style="color: #008000">//</span><span style="color: #008000">取得手动compaction对象</span>
  11. ManualCompaction* m =<span style="color: #000000"> manual_compaction_;
  12. c </span>= versions_->CompactRange(m->level, m->begin, m-><span style="color: #000000">end);
  13. m</span>->done = (c ==<span style="color: #000000"> NULL);
  14. </span><span style="color: #0000ff">if</span> (c !=<span style="color: #000000"> NULL) {
  15. manual_end </span>= c->input(<span style="color: #800080">0</span>, c->num_input_files(<span style="color: #800080">0</span>) - <span style="color: #800080">1</span>)-><span style="color: #000000">largest;
  16. }
  17. Log(options_.info_log,
  18. </span><span style="color: #800000">"</span><span style="color: #800000">Manual compaction at level-%d from %s .. %s; will stop at %s\n</span><span style="color: #800000">"</span><span style="color: #000000">,
  19. m</span>-><span style="color: #000000">level,
  20. (m</span>->begin ? m->begin->DebugString().c_str() : <span style="color: #800000">"</span><span style="color: #800000">(begin)</span><span style="color: #800000">"</span><span style="color: #000000">),
  21. (m</span>->end ? m->end->DebugString().c_str() : <span style="color: #800000">"</span><span style="color: #800000">(end)</span><span style="color: #800000">"</span><span style="color: #000000">),
  22. (m</span>->done ? <span style="color: #800000">"</span><span style="color: #800000">(end)</span><span style="color: #800000">"</span><span style="color: #000000"> : manual_end.DebugString().c_str()));
  23. } </span><span style="color: #0000ff">else</span> { <span style="color: #008000">//</span><span style="color: #008000">取得自动compaction对象</span>
  24. c = versions_-><span style="color: #000000">PickCompaction();
  25. }
  26. Status status;
  27. </span><span style="color: #0000ff">if</span> (c ==<span style="color: #000000"> NULL) {
  28. </span><span style="color: #008000">//</span><span style="color: #008000"> Nothing to do</span>
  29. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!is_manual && c->IsTrivialMove()) {<span style="color: #008000">//</span><span style="color: #008000">2、IsTrivialMove 返回 True,trivial Compaction,则直接将文件移入 level + 1 层即可
  30. </span><span style="color: #008000">//</span><span style="color: #008000"> Move file to next level</span>
  31. assert(c->num_input_files(<span style="color: #800080">0</span>) == <span style="color: #800080">1</span><span style="color: #000000">);
  32. FileMetaData</span>* f = c->input(<span style="color: #800080">0</span>, <span style="color: #800080">0</span><span style="color: #000000">);
  33. c</span>->edit()->DeleteFile(c->level(), f-><span style="color: #000000">number);
  34. c</span>->edit()->AddFile(c->level() + <span style="color: #800080">1</span>, f->number, f-><span style="color: #000000">file_size,
  35. f</span>->smallest, f-><span style="color: #000000">largest);
  36. status </span>= versions_->LogAndApply(c->edit(), &<span style="color: #000000">mutex_);
  37. </span><span style="color: #0000ff">if</span> (!<span style="color: #000000">status.ok()) {
  38. RecordBackgroundError(status);
  39. }
  40. VersionSet::LevelSummaryStorage tmp;
  41. Log(options_.info_log, </span><span style="color: #800000">"</span><span style="color: #800000">Moved #%lld to level-%d %lld bytes %s: %s\n</span><span style="color: #800000">"</span><span style="color: #000000">,
  42. static_cast</span><unsigned <span style="color: #0000ff">long</span> <span style="color: #0000ff">long</span>>(f-><span style="color: #000000">number),
  43. c</span>->level() + <span style="color: #800080">1</span><span style="color: #000000">,
  44. static_cast</span><unsigned <span style="color: #0000ff">long</span> <span style="color: #0000ff">long</span>>(f-><span style="color: #000000">file_size),
  45. status.ToString().c_str(),
  46. versions_</span>->LevelSummary(&<span style="color: #000000">tmp));
  47. } </span><span style="color: #0000ff">else</span> { <span style="color: #008000">//</span><span style="color: #008000">3、一般的合并 </span>
  48. CompactionState* compact = <span style="color: #0000ff">new</span><span style="color: #000000"> CompactionState(c);
  49. status </span>= DoCompactionWork(compact); <span style="color: #008000">//</span><span style="color: #008000">进行compaction</span>
  50. <span style="color: #0000ff">if</span> (!<span style="color: #000000">status.ok()) {
  51. RecordBackgroundError(status);
  52. }
  53. CleanupCompaction(compact);
  54. c</span>->ReleaseInputs(); <span style="color: #008000">//</span><span style="color: #008000"> input的文件引用计数减少1</span>
  55. DeleteObsoleteFiles(); <span style="color: #008000">//</span><span style="color: #008000">删除无用文件</span>
  56. <span style="color: #000000"> }
  57. </span><span style="color: #0000ff">delete</span><span style="color: #000000"> c;
  58. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (status.ok()) {
  59. </span><span style="color: #008000">//</span><span style="color: #008000"> Done</span>
  60. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span><span style="color: #000000"> (shutting_down_.Acquire_Load()) {
  61. </span><span style="color: #008000">//</span><span style="color: #008000"> Ignore compaction errors found during shutting down</span>
  62. } <span style="color: #0000ff">else</span><span style="color: #000000"> {
  63. Log(options_.info_log,
  64. </span><span style="color: #800000">"</span><span style="color: #800000">Compaction error: %s</span><span style="color: #800000">"</span><span style="color: #000000">, status.ToString().c_str());
  65. }
  66. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (is_manual) {
  67. ManualCompaction</span>* m = manual_compaction_; <span style="color: #008000">//</span><span style="color: #008000">标记手动compaction任务完成</span>
  68. <span style="color: #0000ff">if</span> (!<span style="color: #000000">status.ok()) {
  69. m</span>->done = <span style="color: #0000ff">true</span><span style="color: #000000">;
  70. }
  71. </span><span style="color: #0000ff">if</span> (!m-><span style="color: #000000">done) {
  72. </span><span style="color: #008000">//</span><span style="color: #008000"> We only compacted part of the requested range. Update *m
  73. </span><span style="color: #008000">//</span><span style="color: #008000"> to the range that is left to be compacted.</span>
  74. m->tmp_storage =<span style="color: #000000"> manual_end;
  75. m</span>->begin = &m-><span style="color: #000000">tmp_storage;
  76. }
  77. manual_compaction_ </span>=<span style="color: #000000"> NULL;
  78. }
  79. }</span>

 首行mutex_.AssertHeld(),Mutex的AssertHeld函数实现默认为空,在很多函数的实现内有调用,其作用如下: 

As you have observed it does nothing in the default implementation. The function seems to be a placeholder for checking whether a particular thread holds a mutex and optionally abort if it doesn’t. This would be equivalent to the normal asserts we use for variables but applied on mutexes.
I think the reason it is not implemented yet is we don’t have an equivalent light weight function to assert whether a thread holds a lock in pthread_mutex_t used in the default implementation. Some platforms which has that capability could fill this implementation as part of porting process. Searching online I did find some implementation for this function in the windows port of leveldb. I can see one way to implement it using a wrapper class over pthread_mutex_t and setting some sort of a thread id variable to indicate which thread(s) currently holds the mutex, but it will have to be carefully implemented given the race conditions that can arise.

Memtable的合并

Compaction首先检查imm_,及时将已写满的memtable写入磁盘sstable文件,对Memtable的合并,调用DBImpl::CompactMemTable()完成:

  1. <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::CompactMemTable() {
  2. mutex_.AssertHeld();
  3. assert(imm_ </span>!= NULL);<span style="color: #008000">//</span><span style="color: #008000">imm_不能为空</span>
  4. <span style="color: #000000"> VersionEdit edit;
  5. Version</span>* <span style="color: #0000ff">base</span> = versions_-><span style="color: #000000">current();
  6. </span><span style="color: #0000ff">base</span>-><span style="color: #000000">Ref();
  7. Status s </span>= WriteLevel0Table(imm_, &edit, <span style="color: #0000ff">base</span>);<span style="color: #008000">//</span><span style="color: #008000">将Memtable转化为.sst文件,写入level0 sst table,并写入到edit中</span>
  8. <span style="color: #0000ff">base</span>-><span style="color: #000000">Unref();
  9. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
  10. edit.SetPrevLogNumber(</span><span style="color: #800080">0</span><span style="color: #000000">);
  11. edit.SetLogNumber(logfile_number_); </span><span style="color: #008000">//</span><span style="color: #008000"> Earlier logs no longer needed</span>
  12. s = versions_->LogAndApply(&edit, &mutex_);<span style="color: #008000">//</span><span style="color: #008000">应用edit中记录的变化,来生成新的版本 </span>
  13. <span style="color: #000000"> }
  14. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
  15. </span><span style="color: #008000">//</span><span style="color: #008000"> Commit to the new state</span>
  16. imm_-><span style="color: #000000">Unref();
  17. imm_ </span>=<span style="color: #000000"> NULL;
  18. has_imm_.Release_Store(NULL);
  19. DeleteObsoleteFiles();
  20. } </span><span style="color: #0000ff">else</span><span style="color: #000000"> {
  21. RecordBackgroundError(s);
  22. }
  23. }</span>

其中CompactMemTable()主要调用了两个函数:WriteLevel0Table()和versions_->LogAndApply()

CompactMemTable()首先调用WriteLevel0Table(),源码内容如下:

  1. Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit*<span style="color: #000000"> edit,
  2. Version</span>* <span style="color: #0000ff">base</span><span style="color: #000000">) {
  3. mutex_.AssertHeld();
  4. FileMetaData meta;
  5. meta.number </span>= versions_->NewFileNumber();<span style="color: #008000">//</span><span style="color: #008000">获取新生成的.sst文件的编号</span>
  6. <span style="color: #000000"> pending_outputs_.insert(meta.number);
  7. Iterator</span>* iter = mem->NewIterator();<span style="color: #008000">//</span><span style="color: #008000">用于遍历Memtable中的数据</span>
  8. <span style="color: #000000">
  9. Status s;
  10. {
  11. mutex_.Unlock();
  12. s </span>= BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);<span style="color: #008000">//</span><span style="color: #008000">创建.sst文件,并将其相关信息记录在meta中</span>
  13. <span style="color: #000000"> mutex_.Lock();
  14. }
  15. </span><span style="color: #0000ff">delete</span> iter; <span style="color: #008000">//</span><span style="color: #008000">iter用完之后一定要删除</span>
  16. <span style="color: #000000"> pending_outputs_.erase(meta.number);
  17. </span><span style="color: #0000ff">int</span> level = <span style="color: #800080">0</span><span style="color: #000000">;
  18. </span><span style="color: #0000ff">if</span> (s.ok() && meta.file_size > <span style="color: #800080">0</span><span style="color: #000000">) {
  19. </span><span style="color: #0000ff">const</span> Slice min_user_key =<span style="color: #000000"> meta.smallest.user_key();
  20. </span><span style="color: #0000ff">const</span> Slice max_user_key =<span style="color: #000000"> meta.largest.user_key();
  21. </span><span style="color: #0000ff">if</span> (<span style="color: #0000ff">base</span> !=<span style="color: #000000"> NULL) {
  22. level </span>= <span style="color: #0000ff">base</span>->PickLevelForMemTableOutput(min_user_key, max_user_key);<span style="color: #008000">//</span><span style="color: #008000">为合并的输出文件选择合适的level</span>
  23. <span style="color: #000000"> }
  24. edit</span>->AddFile(level, meta.number, meta.file_size,meta.smallest, meta.largest);<span style="color: #008000">//</span><span style="color: #008000">将生成的.sst文件加入到该level</span>
  25. <span style="color: #000000"> }
  26. </span><span style="color: #0000ff">return</span><span style="color: #000000"> s;
  27. }</span>

WriteLevel0Table()首先调用BuildTable()将Immutable Memtable中所有的数据写入到一个.sst文件中,并将.sst文件的信息(文件编号,Key值范围,文件大小)记录到变量meta中.由于Memtable是基于Skiplist的,是一个有序表,因此在写入.sst文件时,Key值也是从小到大来排列的.可以发现,将Memtable中的数据转换为SSTable时,是将所有记录都写入SSTable的,要删除的记录也一样.删除操作会在更高level的Compaction中完成.因此level 0中可能会存在Key值相同的记录.2. 然后调用PickLevelForMemTableOutput()为Memtable合并的输出文件选择合适的level,并调用edit->AddFile()将生成的.sst文件加入到该level中.

然后CompactMemTable()调用db文件夹下version_set.cc文件中的versions_->LogAndApply()基于当前版本和更改edit来得到一个新版本.

Trivial Compaction

由之前的分析可知,is_manual默认为false,会调用PickCompaction()来选出要进行合并的level和相应的输入文件.当c->IsTrivialMove()满足时,则直接将文件移动到下一level.

  1. c = versions_-><span style="color: #000000">PickCompaction();
  2. Status status;
  3. </span><span style="color: #0000ff">if</span> (c ==<span style="color: #000000"> NULL) {
  4. </span><span style="color: #008000">//</span><span style="color: #008000"> Nothing to do</span>
  5. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!is_manual && c-><span style="color: #000000">IsTrivialMove()) {
  6. </span><span style="color: #008000">//</span><span style="color: #008000"> Move file to next level</span>
  7. assert(c->num_input_files(<span style="color: #800080">0</span>) == <span style="color: #800080">1</span><span style="color: #000000">);
  8. FileMetaData</span>* f = c->input(<span style="color: #800080">0</span>, <span style="color: #800080">0</span><span style="color: #000000">);
  9. c</span>->edit()->DeleteFile(c->level(), f->number); <span style="color: #008000">//</span><span style="color: #008000">将文件从该层删除</span>
  10. c->edit()->AddFile(c->level() + <span style="color: #800080">1</span>, f->number, f->file_size, <span style="color: #008000">//</span><span style="color: #008000">将该文件加入到下一level</span>
  11. f->smallest, f-><span style="color: #000000">largest);
  12. status </span>= versions_->LogAndApply(c->edit(), &mutex_); <span style="color: #008000">//</span><span style="color: #008000">应用更改,创建新的Version</span>
  13. }

首先调用db文件夹下version_set.cc文件中的VersionSet::PickCompaction()为接下来的Compaction操作准备输入数据,由之前对Compaction的数据结构分析可知,Compaction操作有两种触发方式:某一level的文件数太多和某一文件的查找次数超过允许值,在进行合并时,将优先考虑文件数过多的情况. 

  1. Compaction*<span style="color: #000000"> VersionSet::PickCompaction() {
  2. Compaction</span>*<span style="color: #000000"> c;
  3. </span><span style="color: #0000ff">int</span><span style="color: #000000"> level;
  4. </span><span style="color: #0000ff">const</span> <span style="color: #0000ff">bool</span> size_compaction = (current_->compaction_score_ >= <span style="color: #800080">1</span>);<span style="color: #008000">//</span><span style="color: #008000">文件数过多</span>
  5. <span style="color: #0000ff">const</span> <span style="color: #0000ff">bool</span> seek_compaction = (current_->file_to_compact_ != NULL);<span style="color: #008000">//</span><span style="color: #008000">某一文件的查找次数太多</span>
  6. <span style="color: #0000ff">if</span> (size_compaction) {<span style="color: #008000">//</span><span style="color: #008000">文件数太多优先考虑</span>
  7. level = current_->compaction_level_; <span style="color: #008000">//</span><span style="color: #008000">要进行Compaction的level</span>
  8. c = <span style="color: #0000ff">new</span><span style="color: #000000"> Compaction(level);
  9. </span><span style="color: #008000">//</span><span style="color: #008000">每一层有一个compact_pointer,用于记录compaction key,这样可以进行循环compaction</span>
  10. <span style="color: #0000ff">for</span> (size_t i = <span style="color: #800080">0</span>; i < current_->files_[level].size(); i++) { <span style="color: #008000">//</span><span style="color: #008000">从待合并的level中选择合适的文件完成合并操作</span>
  11. FileMetaData* f = current_->files_[level][i]; <span style="color: #008000">//</span><span style="color: #008000">level层中的第i个文件</span>
  12. <span style="color: #0000ff">if</span> (compact_pointer_[level].empty() || <span style="color: #008000">//</span><span style="color: #008000">compact_pointer_中记录的是下次合并的起始Key值,为空时都可以进行合并</span>
  13. icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > <span style="color: #800080">0</span>) { <span style="color: #008000">//</span><span style="color: #008000">或者f的最大Key值大于起始值</span>
  14. c->inputs_[<span style="color: #800080">0</span>].push_back(f);<span style="color: #008000">//</span><span style="color: #008000">则该文件可以参与合并,将其加入到level输入文件中</span>
  15. <span style="color: #0000ff">break</span><span style="color: #000000">;
  16. }
  17. }
  18. </span><span style="color: #0000ff">if</span> (c->inputs_[<span style="color: #800080">0</span>].empty()) { <span style="color: #008000">//</span><span style="color: #008000">若level输入为空,则将level的第一个文件加入到输入中</span>
  19. c->inputs_[<span style="color: #800080">0</span>].push_back(current_->files_[level][<span style="color: #800080">0</span><span style="color: #000000">]);
  20. }
  21. } </span><span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (seek_compaction) {<span style="color: #008000">//</span><span style="color: #008000">然后考虑查找次数过多的情况</span>
  22. level = current_-><span style="color: #000000">file_to_compact_level_;
  23. c </span>= <span style="color: #0000ff">new</span><span style="color: #000000"> Compaction(level);
  24. c</span>->inputs_[<span style="color: #800080">0</span>].push_back(current_->file_to_compact_);<span style="color: #008000">//</span><span style="color: #008000">将待合并的文件作为level层的输入</span>
  25. } <span style="color: #0000ff">else</span><span style="color: #000000"> {
  26. </span><span style="color: #0000ff">return</span><span style="color: #000000"> NULL;
  27. }
  28. c</span>->input_version_ =<span style="color: #000000"> current_;
  29. c</span>->input_version_-><span style="color: #000000">Ref();
  30. </span><span style="color: #008000">//</span><span style="color: #008000">level 0中的Key值是可以重复的,因此Key值范围可能相互覆盖,把所有重叠都找出来,一起做compaction</span>
  31. <span style="color: #0000ff">if</span> (level == <span style="color: #800080">0</span><span style="color: #000000">) {
  32. InternalKey smallest, largest;
  33. GetRange(c</span>->inputs_[<span style="color: #800080">0</span>], &smallest, &largest);<span style="color: #008000">//</span><span style="color: #008000">待合并的level层的文件的Key值范围</span>
  34. current_->GetOverlappingInputs(<span style="color: #800080">0</span>, &smallest, &largest, &c->inputs_[<span style="color: #800080">0</span><span style="color: #000000">]);
  35. assert(</span>!c->inputs_[<span style="color: #800080">0</span><span style="color: #000000">].empty());
  36. }
  37. SetupOtherInputs(c);</span><span style="color: #008000">//</span><span style="color: #008000">获取待合并的level+1层的输入</span>
  38. <span style="color: #0000ff">return</span><span style="color: #000000"> c;
  39. }</span>

 

然后判断是否为trivial Compaction,当为trivial Compaction时,只需要简单的将level层的文件移动到level +1 层即可

  1. <span style="color: #0000ff">bool</span> Compaction::IsTrivialMove() <span style="color: #0000ff">const</span><span style="color: #000000"> {
  2. </span><span style="color: #0000ff">return</span> (num_input_files(<span style="color: #800080">0</span>) == <span style="color: #800080">1</span> && <span style="color: #008000">//</span><span style="color: #008000">level层只有1个文件</span>
  3. num_input_files(<span style="color: #800080">1</span>) == <span style="color: #800080">0</span> && <span style="color: #008000">//</span><span style="color: #008000">level+1层没有文件</span>
  4. TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);<span style="color: #008000">//</span><span style="color: #008000">level+2层文件总大小不超过最大覆盖范围,否则会导致后面的merge需要很大的开销</span>
  5. }

最终完成完成Compaction操作

  1. c->edit()->DeleteFile(c->level(), f-><span style="color: #000000">number);
  2. c</span>->edit()->AddFile(c->level() + <span style="color: #800080">1</span>, f->number, f->file_size,f->smallest, f-><span style="color: #000000">largest);
  3. status </span>= versions_->LogAndApply(c->edit(), &mutex_);

一般的合并

一般的合并调用DBImpl::DoCompactionWork()完成,compact是调用VersionSet::PickCompacttion()得到的,与之前的trivial Compaction相同.不同level之间,可能存在Key值相同的记录,但是记录的seq不同.由之前的分析可知,最新的数据存放在较低的level中,其对应的seq也一定比level+1中的记录的seq要大,因此当出现相同Key值的记录时,只需要记录第一条记录,后面的都可以丢弃.level 0中也可能存在Key值相同的数据,其后面的seq也不同.数据越新,其对应的seq越大,且记录在level 0中的记录是按照user_key递增,seq递减的方式存储的,则相同user_key对应的记录是聚集在一起的,且按照seq递减的方式存放的.在更高层的Compaction时,只需要处理第一条出现的user_key相同的记录即可,后面的相同user_key的记录都可以丢弃.因此合并后的level +1层的文件中不会存在Key值相同的记录.删除记录的操作也会在此时完成,删除数据的记录会被丢弃,而不会被写入到更高level的文件中.

 

  1. Status DBImpl::DoCompactionWork(CompactionState*<span style="color: #000000"> compact) {
  2. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (snapshots_.empty()) {
  3. compact</span>->smallest_snapshot = versions_-><span style="color: #000000">LastSequence();
  4. } </span><span style="color: #0000ff">else</span><span style="color: #000000"> {
  5. compact</span>->smallest_snapshot = snapshots_.oldest()-><span style="color: #000000">number_;
  6. }
  7. mutex_.Unlock();
  8. </span><span style="color: #008000">//</span><span style="color: #008000">生成iterator:遍历要compaction的数据</span>
  9. Iterator* input = versions_->MakeInputIterator(compact->compaction);<span style="color: #008000">//</span><span style="color: #008000">用于遍历待合并的每一个文件</span>
  10. input-><span style="color: #000000">SeekToFirst();
  11. Status status;
  12. ParsedInternalKey ikey;
  13. std::</span><span style="color: #0000ff">string</span><span style="color: #000000"> current_user_key;
  14. </span><span style="color: #0000ff">bool</span> has_current_user_key = <span style="color: #0000ff">false</span><span style="color: #000000">;
  15. SequenceNumber last_sequence_for_key </span>=<span style="color: #000000"> kMaxSequenceNumber;
  16. </span><span style="color: #0000ff">for</span> (; input->Valid() && !<span style="color: #000000">shutting_down_.Acquire_Load(); ) {
  17. </span><span style="color: #0000ff">if</span> (has_imm_.NoBarrier_Load() != NULL) { <span style="color: #008000">//</span><span style="color: #008000">immutable memtable的优先级最高</span>
  18. <span style="color: #000000"> mutex_.Lock();
  19. </span><span style="color: #0000ff">if</span> (imm_ != NULL) { <span style="color: #008000">//</span><span style="color: #008000">当imm_非空时,合并Memtable</span>
  20. <span style="color: #000000"> CompactMemTable();
  21. bg_cv_.SignalAll(); </span><span style="color: #008000">//</span><span style="color: #008000"> Wakeup MakeRoomForWrite() if necessary</span>
  22. <span style="color: #000000"> }
  23. mutex_.Unlock();
  24. }
  25. Slice key </span>= input-><span style="color: #000000">key();
  26. </span><span style="color: #0000ff">if</span> (compact->compaction->ShouldStopBefore(key) && <span style="color: #008000">//</span><span style="color: #008000">是否需要停止Compaction,中途输出compaction的结果,避免compaction结果和level N+2 files有过多的重叠</span>
  27. compact->builder !=<span style="color: #000000"> NULL) {
  28. status </span>=<span style="color: #000000"> FinishCompactionOutputFile(compact, input);
  29. }
  30. </span><span style="color: #0000ff">bool</span> drop = <span style="color: #0000ff">false</span><span style="color: #000000">;
  31. </span><span style="color: #0000ff">if</span> (!ParseInternalKey(key, &<span style="color: #000000">ikey)) {
  32. current_user_key.clear();
  33. has_current_user_key </span>= <span style="color: #0000ff">false</span><span style="color: #000000">;
  34. last_sequence_for_key </span>=<span style="color: #000000"> kMaxSequenceNumber;
  35. } </span><span style="color: #0000ff">else</span><span style="color: #000000"> {
  36. </span><span style="color: #0000ff">if</span> (!has_current_user_key || <span style="color: #008000">//</span><span style="color: #008000">获取当前的user_key和sequence</span>
  37. user_comparator()-><span style="color: #000000">Compare(ikey.user_key,
  38. Slice(current_user_key)) </span>!= <span style="color: #800080">0</span>) { <span style="color: #008000">//</span><span style="color: #008000">可能存在Key值相同但seq不同的记录
  39. </span><span style="color: #008000">//</span><span style="color: #008000"> 此时是这个Key第一次出现</span>
  40. <span style="color: #000000"> current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
  41. has_current_user_key </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
  42. last_sequence_for_key </span>= kMaxSequenceNumber;<span style="color: #008000">//</span><span style="color: #008000">则将其seq设为最大值,表示第一次出现</span>
  43. <span style="color: #000000"> }
  44. </span><span style="color: #0000ff">if</span> (last_sequence_for_key <= compact->smallest_snapshot) {<span style="color: #008000">//</span><span style="color: #008000">表示key已经出现过,否则seq应为KMaxSequenceNumber</span>
  45. drop = <span style="color: #0000ff">true</span>; <span style="color: #008000">//</span><span style="color: #008000"> (A) </span><span style="color: #008000">//</span><span style="color: #008000">之前已经存在Key值相同的记录,丢弃</span>
  46. } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (ikey.type == kTypeDeletion && <span style="color: #008000">//</span><span style="color: #008000">要删除该记录</span>
  47. ikey.sequence <= compact->smallest_snapshot && <span style="color: #008000">//</span><span style="color: #008000">记录的序号比数据库之前的最小序号还小</span>
  48. compact->compaction->IsBaseLevelForKey(ikey.user_key)) { <span style="color: #008000">//</span><span style="color: #008000">高的level中没有数据</span>
  49. drop = <span style="color: #0000ff">true</span>; <span style="color: #008000">//</span><span style="color: #008000">此时要丢弃该记录</span>
  50. <span style="color: #000000"> }
  51. last_sequence_for_key </span>= ikey.sequence;<span style="color: #008000">//</span><span style="color: #008000">上次出现的记录对应的sequence,用于判断后面出现相同Key值的情况</span>
  52. <span style="color: #000000"> }
  53. </span><span style="color: #0000ff">if</span> (!drop) { <span style="color: #008000">//</span><span style="color: #008000">如果不需要丢弃该记录</span>
  54. <span style="color: #0000ff">if</span> (compact->builder ==<span style="color: #000000"> NULL) {
  55. status </span>= OpenCompactionOutputFile(compact);<span style="color: #008000">//</span><span style="color: #008000">若需要,则创建一个.sst文件,用于存放合并后的数据</span>
  56. <span style="color: #000000"> }
  57. </span><span style="color: #0000ff">if</span> (compact->builder->NumEntries() == <span style="color: #800080">0</span><span style="color: #000000">) {
  58. compact</span>->current_output()-><span style="color: #000000">smallest.DecodeFrom(key);
  59. }
  60. compact</span>->current_output()-><span style="color: #000000">largest.DecodeFrom(key);
  61. compact</span>->builder->Add(key, input->value());<span style="color: #008000">//</span><span style="color: #008000">将记录写入.sst文件</span>
  62. <span style="color: #0000ff">if</span> (compact->builder->FileSize() >=<span style="color: #000000">
  63. compact</span>->compaction->MaxOutputFileSize()) { <span style="color: #008000">//</span><span style="color: #008000">当.sst文件超过最大值时</span>
  64. status = FinishCompactionOutputFile(compact, input);<span style="color: #008000">//</span><span style="color: #008000">完成Compaction输出文件</span>
  65. <span style="color: #000000"> }
  66. }
  67. input</span>->Next(); <span style="color: #008000">//</span><span style="color: #008000">处理下一个文件</span>
  68. <span style="color: #000000"> }
  69. </span><span style="color: #0000ff">if</span> (status.ok() && compact->builder !=<span style="color: #000000"> NULL) {
  70. status </span>=<span style="color: #000000"> FinishCompactionOutputFile(compact, input);
  71. }
  72. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (status.ok()) {
  73. status </span>= input-><span style="color: #000000">status();
  74. }
  75. </span><span style="color: #0000ff">delete</span><span style="color: #000000"> input;
  76. input </span>=<span style="color: #000000"> NULL;
  77. </span><span style="color: #008000">//</span><span style="color: #008000">更新compaction的一些统计数据</span>
  78. <span style="color: #000000"> CompactionStats stats;
  79. stats.micros </span>= env_->NowMicros() - start_micros -<span style="color: #000000"> imm_micros;
  80. </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> which = <span style="color: #800080">0</span>; which < <span style="color: #800080">2</span>; which++<span style="color: #000000">) {
  81. </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> i = <span style="color: #800080">0</span>; i < compact->compaction->num_input_files(which); i++<span style="color: #000000">) {
  82. stats.bytes_read </span>+= compact->compaction->input(which, i)-><span style="color: #000000">file_size;
  83. }
  84. }
  85. </span><span style="color: #0000ff">for</span> (size_t i = <span style="color: #800080">0</span>; i < compact->outputs.size(); i++<span style="color: #000000">) {
  86. stats.bytes_written </span>+= compact-><span style="color: #000000">outputs[i].file_size;
  87. }
  88. mutex_.Lock();
  89. stats_[compact</span>->compaction->level() + <span style="color: #800080">1</span><span style="color: #000000">].Add(stats);
  90. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (status.ok()) {
  91. status </span>= InstallCompactionResults(compact);<span style="color: #008000">//</span><span style="color: #008000">完成合并</span>
  92. <span style="color: #000000"> }
  93. </span><span style="color: #0000ff">if</span> (!<span style="color: #000000">status.ok()) {
  94. RecordBackgroundError(status);
  95. }
  96. VersionSet::LevelSummaryStorage tmp;
  97. Log(options_.info_log,
  98. </span><span style="color: #800000">"</span><span style="color: #800000">compacted to: %s</span><span style="color: #800000">"</span>, versions_->LevelSummary(&<span style="color: #000000">tmp));
  99. </span><span style="color: #0000ff">return</span><span style="color: #000000"> status;
  100. }</span>

 

首先将可以留下的记录写入到.sst文件中,并将相关信息保存在变量compact中,然后调用InstallCompactionResults()将所做的改动加入到VersionEdit中,再调用LogAndApply()来得到新的版本.

LogAndApply()

在上面三种不同的Compaction操作中,最终当对当前版本的更改VersionEdit全部完成后,都会调用VersionSet::LogAndApply()来应用更改,创建新版本.edit中保存了level和level+1层要删除和增加的文件.

  1. Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex*<span style="color: #000000"> mu) {
  2. Version</span>* v = <span style="color: #0000ff">new</span> Version(<span style="color: #0000ff">this</span>); <span style="color: #008000">//</span><span style="color: #008000">创建一个新Version</span>
  3. <span style="color: #000000"> {
  4. Builder builder(</span><span style="color: #0000ff">this</span>, current_);<span style="color: #008000">//</span><span style="color: #008000">基于当前Version创建一个builder变量</span>
  5. builder.Apply(edit);<span style="color: #008000">//</span><span style="color: #008000">将edit中记录的要增加、删除的文件加入到builder类中</span>
  6. builder.SaveTo(v);<span style="color: #008000">//</span><span style="color: #008000">然后将edit中的记录保存到新创建的Version中,这样就得到了一个新的版本</span>
  7. <span style="color: #000000"> }
  8. Finalize(v);</span><span style="color: #008000">//</span><span style="color: #008000">根据各层文件数来判断是否还需要进行Compaction</span>
  9. <span style="color: #000000">
  10. std::</span><span style="color: #0000ff">string</span><span style="color: #000000"> new_manifest_file;
  11. Status s;
  12. </span><span style="color: #0000ff">if</span> (descriptor_log_ == NULL) { <span style="color: #008000">//</span><span style="color: #008000">只会在第一次调用时进入</span>
  13. assert(descriptor_file_ ==<span style="color: #000000"> NULL);
  14. new_manifest_file </span>= DescriptorFileName(dbname_, manifest_file_number_);<span style="color: #008000">//</span><span style="color: #008000">创建一个新的Manifest文件</span>
  15. edit-><span style="color: #000000">SetNextFile(next_file_number_);
  16. s </span>= env_->NewWritableFile(new_manifest_file, &<span style="color: #000000">descriptor_file_);
  17. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
  18. descriptor_log_ </span>= <span style="color: #0000ff">new</span><span style="color: #000000"> log::Writer(descriptor_file_);
  19. s </span>= WriteSnapshot(descriptor_log_);<span style="color: #008000">//</span><span style="color: #008000">快照,系统开始时完整记录数据库的所有信息</span>
  20. <span style="color: #000000"> }
  21. }
  22. {
  23. mu</span>-><span style="color: #000000">Unlock();
  24. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
  25. std::</span><span style="color: #0000ff">string</span><span style="color: #000000"> record;
  26. edit</span>->EncodeTo(&<span style="color: #000000">record);
  27. s </span>= descriptor_log_->AddRecord(record);<span style="color: #008000">//</span><span style="color: #008000">将数据库的变化记录到Manifest文件中</span>
  28. <span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
  29. s </span>= descriptor_file_-><span style="color: #000000">Sync();
  30. }
  31. }
  32. </span><span style="color: #0000ff">if</span> (s.ok() && !<span style="color: #000000">new_manifest_file.empty()) {
  33. s </span>=<span style="color: #000000"> SetCurrentFile(env_, dbname_, manifest_file_number_);
  34. }
  35. mu</span>-><span style="color: #000000">Lock();
  36. }
  37. </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
  38. AppendVersion(v); </span><span style="color: #008000">//</span><span style="color: #008000">将新得到的Version插入到所有Version形成的双向链表的尾部</span>
  39. log_number_ = edit-><span style="color: #000000">log_number_;
  40. prev_log_number_ </span>= edit-><span style="color: #000000">prev_log_number_;
  41. }
  42. }
  43. </span><span style="color: #0000ff">return</span><span style="color: #000000"> s;
  44. }</span>

为了重启之后能恢复数据库之前的状态,就需要将数据库的历史变化信息记录下来,这些信息都是记录在Manifest文件中的.为了节省空间和时间,leveldb采用的是在系统开始完整的所有数据库的信息(WriteSnapShot()),以后则只记录数据库的变化,即VersionEdit中的信息(descriptor_log_->AddRecord()).恢复时,只需要根据Manifest中的信息就可以一步步的恢复到上次的状态.

VersionSet::LogAndApply首先创建一个新的Version,然后调用builder.Apply(edit)将edit中所有要删除、增加的文件编号记录下来,其源码如下:

  1. <span style="color: #008000">//</span><span style="color: #008000"> Apply all of the edits in *edit to the current state.</span>
  2. <span style="color: #0000ff">void</span> Apply(VersionEdit*<span style="color: #000000"> edit) {
  3. </span><span style="color: #008000">//</span><span style="color: #008000"> 更新每一层下次合并的起始Key值</span>
  4. <span style="color: #0000ff">for</span> (size_t i = <span style="color: #800080">0</span>; i < edit->compact_pointers_.size(); i++<span style="color: #000000">) {
  5. </span><span style="color: #0000ff">const</span> <span style="color: #0000ff">int</span> level = edit-><span style="color: #000000">compact_pointers_[i].first;
  6. vset_</span>->compact_pointer_[level] =<span style="color: #000000">
  7. edit</span>-><span style="color: #000000">compact_pointers_[i].second.Encode().ToString();
  8. }
  9. </span><span style="color: #008000">//</span><span style="color: #008000">将所有要删除的文件加入到levels_[level].deleted_files变量中</span>
  10. <span style="color: #0000ff">const</span> VersionEdit::DeletedFileSet& del = edit-><span style="color: #000000">deleted_files_;
  11. </span><span style="color: #0000ff">for</span> (VersionEdit::DeletedFileSet::const_iterator iter =<span style="color: #000000"> del.begin();
  12. iter </span>!= del.end();++<span style="color: #000000">iter) {
  13. </span><span style="color: #0000ff">const</span> <span style="color: #0000ff">int</span> level = iter-><span style="color: #000000">first;
  14. </span><span style="color: #0000ff">const</span> uint64_t number = iter-><span style="color: #000000">second;
  15. levels_[level].deleted_files.insert(number);
  16. }
  17. </span><span style="color: #008000">//</span><span style="color: #008000"> 将所有新增加的文件加入到levels_[level].added_files中</span>
  18. <span style="color: #0000ff">for</span> (size_t i = <span style="color: #800080">0</span>; i < edit->new_files_.size(); i++<span style="color: #000000">) {
  19. </span><span style="color: #0000ff">const</span> <span style="color: #0000ff">int</span> level = edit-><span style="color: #000000">new_files_[i].first;
  20. FileMetaData</span>* f = <span style="color: #0000ff">new</span> FileMetaData(edit-><span style="color: #000000">new_files_[i].second);
  21. f</span>->refs = <span style="color: #800080">1</span><span style="color: #000000">;
  22. f</span>->allowed_seeks = (f->file_size / <span style="color: #800080">16384</span><span style="color: #000000">);
  23. </span><span style="color: #0000ff">if</span> (f->allowed_seeks < <span style="color: #800080">100</span>) f->allowed_seeks = <span style="color: #800080">100</span><span style="color: #000000">;
  24. levels_[level].deleted_files.erase(f</span>-><span style="color: #000000">number);
  25. levels_[level].added_files</span>-><span style="color: #000000">insert(f);
  26. }
  27. }</span>

然后VersionSet::LogAndApply再调用builder.SaveTo(v)将更改保存到新的Version中,其源码如下:

  1. <span style="color: #0000ff">void</span> SaveTo(Version*<span style="color: #000000"> v) {
  2. BySmallestKey cmp;
  3. cmp.internal_comparator </span>= &vset_-><span style="color: #000000">icmp_;
  4. </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> level = <span style="color: #800080">0</span>; level < config::kNumLevels; level++<span style="color: #000000">) {
  5. </span><span style="color: #0000ff">const</span> std::vector<FileMetaData*>& base_files = base_->files_[level];<span style="color: #008000">//</span><span style="color: #008000">当前Version中原有的各个level的.sst文件</span>
  6. std::vector<FileMetaData*>::const_iterator base_iter =<span style="color: #000000"> base_files.begin();
  7. std::vector</span><FileMetaData*>::const_iterator base_end =<span style="color: #000000"> base_files.end();
  8. </span><span style="color: #0000ff">const</span> FileSet* added = levels_[level].added_files;<span style="color: #008000">//</span><span style="color: #008000">对应level新增加的文件</span>
  9. v->files_[level].reserve(base_files.size() + added-><span style="color: #000000">size());
  10. </span><span style="color: #0000ff">for</span> (FileSet::const_iterator added_iter = added-><span style="color: #000000">begin();
  11. added_iter </span>!= added->end();++<span style="color: #000000">added_iter) {
  12. </span><span style="color: #008000">//</span><span style="color: #008000"> 将原有文件中编号比added小的加入到新的Version</span>
  13. <span style="color: #0000ff">for</span> (std::vector<FileMetaData*><span style="color: #000000">::const_iterator bpos
  14. </span>= std::upper_bound(base_iter, base_end, *<span style="color: #000000">added_iter, cmp);
  15. base_iter </span>!= bpos;++<span style="color: #000000">base_iter) {
  16. MaybeAddFile(v, level, </span>*<span style="color: #000000">base_iter);
  17. }
  18. MaybeAddFile(v, level, </span>*added_iter);<span style="color: #008000">//</span><span style="color: #008000">再将新增的文件依次加入到新的Version</span>
  19. <span style="color: #000000"> }
  20. </span><span style="color: #0000ff">for</span> (; base_iter != base_end; ++<span style="color: #000000">base_iter) {
  21. MaybeAddFile(v, level, </span>*base_iter);<span style="color: #008000">//</span><span style="color: #008000">再将原有文件中剩余的部分加入到新的Version</span>
  22. <span style="color: #000000"> }
  23. }
  24. }</span>

bpos = std::upper_bound(base_iter,base_end,*added_iter,cmp); // 返回base_iter到base_end之间,第一个大于*added_iter的iter.假设原有文件的编号为1、3、4、6、8,新增文件的编号为2、5、7,则第一次循环时,bpos为3对应的迭代器,因此base_iter只遍历一个元素,即将编号1加入到新的Version中.总体对新增文件来说,就是首先加入base中编号比它小的,然后再将其加入,然后再继续比那里下一个新增文件,因此最终得到的文件编号顺序是 1、2、3、4、5、6、7、8,即每一层的.sst文件都是按照编号从小到大排列的.这样就得到了新的Version的每一层的所有文件.

 

参考文献:

1.http://blog.csdn.net/u012658346/article/details/45787233

2.http://blog.csdn.net/u012658346/article/details/45788939

3.http://blog.csdn.net/joeyon1985/article/details/47154249

4.http://www.blogjava.net/sandy/archive/2012/03/15/leveldb6.html

5.http://www.pandademo.com/2016/04/compaction-of-sstable-leveldb-part-1-source-dissect-9/

LevelDB的源码阅读(四) Compaction操作

标签:一个   check   sdn   ras   sandy   dad   delete   分析   mmu   

人气教程排行