时间:2021-07-01 10:21:17 帮助过:20人阅读
进行Compaction操作的条件如下:
1.产生了新的immutable table需要写入数据文件
2.某个level的数据规模过大
3.某个文件被无效查询的次数过多(在文件i中查询key,没有找到key,这次查询称为文件i的无效查询)
4.手动compaction
满足以上条件会启动Compaction过程,接下来分析详细的Compaction过程.
Leveldb进行Compaction的入口函数是db文件夹下db_impl.cc文件中的DBImpl::MaybeScheduleCompaction,该函数在每次leveldb进行读写操作时都有可能被调用.源码内容如下:
- <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::MaybeScheduleCompaction() {
- mutex_.AssertHeld();
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (bg_compaction_scheduled_) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Already scheduled</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span><span style="color: #000000"> (shutting_down_.Acquire_Load()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> DB is being deleted; no more background compactions</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!<span style="color: #000000">bg_error_.ok()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Already got an error; no more changes</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (imm_ == NULL &&<span style="color: #000000">
- manual_compaction_ </span>== NULL &&
- !versions_-><span style="color: #000000">NeedsCompaction()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> No work to be done</span>
- } <span style="color: #0000ff">else</span><span style="color: #000000"> {
- bg_compaction_scheduled_ </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
- env_</span>->Schedule(&DBImpl::BGWork, <span style="color: #0000ff">this</span>); <span style="color: #008000">//</span><span style="color: #008000">新建后台任务并进行调度</span>
- <span style="color: #000000"> }
- }</span>
首先调用db文件夹下version_set.h中的NeedsCompaction()判断是否需要启动Compact任务.源码内容如下:
- <span style="color: #008000">//</span><span style="color: #008000"> Returns true iff some level needs a compaction.</span>
- <span style="color: #0000ff">bool</span> NeedsCompaction() <span style="color: #0000ff">const</span><span style="color: #000000"> {
- Version</span>* v =<span style="color: #000000"> current_;
- </span><span style="color: #0000ff">return</span> (v->compaction_score_ >= <span style="color: #800080">1</span>) || (v->file_to_compact_ !=<span style="color: #000000"> NULL);
- }</span>
version_set.cc中compaction_score_ 的计算如下:
- <span style="color: #0000ff">void</span> VersionSet::Finalize(Version*<span style="color: #000000"> v) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Precomputed best level for next compaction</span>
- <span style="color: #0000ff">int</span> best_level = -<span style="color: #800080">1</span><span style="color: #000000">;
- </span><span style="color: #0000ff">double</span> best_score = -<span style="color: #800080">1</span><span style="color: #000000">;
- </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> level = <span style="color: #800080">0</span>; level < config::kNumLevels-<span style="color: #800080">1</span>; level++<span style="color: #000000">) {
- </span><span style="color: #0000ff">double</span><span style="color: #000000"> score;
- </span><span style="color: #0000ff">if</span> (level == <span style="color: #800080">0</span><span style="color: #000000">) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> We treat level-0 specially by bounding the number of files
- </span><span style="color: #008000">//</span><span style="color: #008000"> instead of number of bytes for two reasons:
- </span><span style="color: #008000">//</span>
- <span style="color: #008000">//</span><span style="color: #008000"> (1) With larger write-buffer sizes, it is nice not to do too
- </span><span style="color: #008000">//</span><span style="color: #008000"> many level-0 compactions.
- </span><span style="color: #008000">//</span>
- <span style="color: #008000">//</span><span style="color: #008000"> (2) The files in level-0 are merged on every read and
- </span><span style="color: #008000">//</span><span style="color: #008000"> therefore we wish to avoid too many files when the individual
- </span><span style="color: #008000">//</span><span style="color: #008000"> file size is small (perhaps because of a small write-buffer
- </span><span style="color: #008000">//</span><span style="color: #008000"> setting, or very high compression ratios, or lots of
- </span><span style="color: #008000">//</span><span style="color: #008000"> overwrites/deletions).</span>
- score = v->files_[level].size() /<span style="color: #000000">
- static_cast</span><<span style="color: #0000ff">double</span>><span style="color: #000000">(config::kL0_CompactionTrigger);
- } </span><span style="color: #0000ff">else</span><span style="color: #000000"> {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Compute the ratio of current size to size limit.</span>
- <span style="color: #0000ff">const</span> uint64_t level_bytes = TotalFileSize(v-><span style="color: #000000">files_[level]);
- score </span>= static_cast<<span style="color: #0000ff">double</span>>(level_bytes) /<span style="color: #000000"> MaxBytesForLevel(level);
- }
- </span><span style="color: #0000ff">if</span> (score ><span style="color: #000000"> best_score) {
- best_level </span>=<span style="color: #000000"> level;
- best_score </span>=<span style="color: #000000"> score;
- }
- }
- v</span>->compaction_level_ =<span style="color: #000000"> best_level;
- v</span>->compaction_score_ =<span style="color: #000000"> best_score;
- }</span>
注意,这里同时预计算了进行compaction的最佳level.
确认需要启动compaction之后,调用util文件夹下env_posix.cc文件中的PosixEnv::Schedule函数启动Compact过程.
- <span style="color: #0000ff">void</span> PosixEnv::Schedule(<span style="color: #0000ff">void</span> (*function)(<span style="color: #0000ff">void</span>*), <span style="color: #0000ff">void</span>*<span style="color: #000000"> arg) {
- PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">lock</span><span style="color: #800000">"</span>, pthread_mutex_lock(&<span style="color: #000000">mu_));
- </span><span style="color: #008000">//</span><span style="color: #008000"> Start background thread if necessary</span>
- <span style="color: #0000ff">if</span> (!<span style="color: #000000">started_bgthread_) {
- started_bgthread_ </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
- PthreadCall(
- </span><span style="color: #800000">"</span><span style="color: #800000">create thread</span><span style="color: #800000">"</span><span style="color: #000000">,
- pthread_create(</span>&bgthread_, NULL, &PosixEnv::BGThreadWrapper, <span style="color: #0000ff">this</span><span style="color: #000000">));
- }
- </span><span style="color: #008000">//</span><span style="color: #008000"> If the queue is currently empty, the background thread may currently be
- </span><span style="color: #008000">//</span><span style="color: #008000"> waiting.</span>
- <span style="color: #0000ff">if</span><span style="color: #000000"> (queue_.empty()) {
- PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">signal</span><span style="color: #800000">"</span>, pthread_cond_signal(&<span style="color: #000000">bgsignal_));
- }
- </span><span style="color: #008000">//</span><span style="color: #008000"> Add to priority queue</span>
- <span style="color: #000000"> queue_.push_back(BGItem());
- queue_.back().function </span>=<span style="color: #000000"> function;
- queue_.back().arg </span>=<span style="color: #000000"> arg;
- PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">unlock</span><span style="color: #800000">"</span>, pthread_mutex_unlock(&<span style="color: #000000">mu_));
- }</span>
如果没有后台线程,则创建后台线程,否则新建一个后台执行任务BGItem压入后台线程任务队列,然后调用PosixEnv::BGThreadWrapper唤醒后台线程:
- <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span>* BGThreadWrapper(<span style="color: #0000ff">void</span>*<span style="color: #000000"> arg) {
- reinterpret_cast</span><PosixEnv*>(arg)-><span style="color: #000000">BGThread();
- </span><span style="color: #0000ff">return</span><span style="color: #000000"> NULL;
- }</span>
BGThreadWrapper调用PosixEnv::BGThread,不断地从后台任务队列中拿到任务,然后执行任务
- <span style="color: #0000ff">void</span><span style="color: #000000"> PosixEnv::BGThread() {
- </span><span style="color: #0000ff">while</span> (<span style="color: #0000ff">true</span><span style="color: #000000">) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Wait until there is an item that is ready to run</span>
- PthreadCall(<span style="color: #800000">"</span><span style="color: #800000">lock</span><span style="color: #800000">"</span>, pthread_mutex_lock(&<span style="color: #000000">mu_));
- </span><span style="color: #0000ff">while</span><span style="color: #000000"> (queue_.empty()) {
- PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">wait</span><span style="color: #800000">"</span>, pthread_cond_wait(&bgsignal_, &<span style="color: #000000">mu_));
- }
- </span><span style="color: #0000ff">void</span> (*function)(<span style="color: #0000ff">void</span>*) =<span style="color: #000000"> queue_.front().function;
- </span><span style="color: #0000ff">void</span>* arg =<span style="color: #000000"> queue_.front().arg;
- queue_.pop_front();
- PthreadCall(</span><span style="color: #800000">"</span><span style="color: #800000">unlock</span><span style="color: #800000">"</span>, pthread_mutex_unlock(&<span style="color: #000000">mu_));
- (</span>*<span style="color: #000000">function)(arg);
- }
- }</span>
回到DBImpl::MaybeScheduleCompaction,方便理解起见这里再重复一遍源码
- <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::MaybeScheduleCompaction() {
- mutex_.AssertHeld();
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (bg_compaction_scheduled_) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Already scheduled</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span><span style="color: #000000"> (shutting_down_.Acquire_Load()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> DB is being deleted; no more background compactions</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!<span style="color: #000000">bg_error_.ok()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Already got an error; no more changes</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (imm_ == NULL &&<span style="color: #000000">
- manual_compaction_ </span>== NULL &&
- !versions_-><span style="color: #000000">NeedsCompaction()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> No work to be done</span>
- } <span style="color: #0000ff">else</span><span style="color: #000000"> {
- bg_compaction_scheduled_ </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
- env_</span>->Schedule(&DBImpl::BGWork, <span style="color: #0000ff">this</span>); <span style="color: #008000">//</span><span style="color: #008000">新建后台任务并进行调度</span>
- <span style="color: #000000"> }
- }</span>
之前分析了env_->Schedule进行的调度过程,现在来分析实际进行后台任务的DBImpl::BGWork.DBImpl::BGWork在db文件夹下db_impl.cc文件中.
- <span style="color: #0000ff">void</span> DBImpl::BGWork(<span style="color: #0000ff">void</span>*<span style="color: #000000"> db) {
- reinterpret_cast</span><DBImpl*>(db)-><span style="color: #000000">BackgroundCall();
- }</span>
DBImpl::BGWork调用DBImpl::BackgroundCall(),合并完成后可能导致有的level的文件数过多,因此会再次调用MaybeScheduleCompaction()判断是否需要继续进行合并.
- <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::BackgroundCall() {
- MutexLock l(</span>&<span style="color: #000000">mutex_);
- assert(bg_compaction_scheduled_);
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (shutting_down_.Acquire_Load()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> No more background work when shutting down.</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!<span style="color: #000000">bg_error_.ok()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> No more background work after a background error.</span>
- } <span style="color: #0000ff">else</span><span style="color: #000000"> {
- BackgroundCompaction();
- }
- bg_compaction_scheduled_ </span>= <span style="color: #0000ff">false</span><span style="color: #000000">;
- </span><span style="color: #008000">//</span><span style="color: #008000"> Previous compaction may have produced too many files in a level,
- </span><span style="color: #008000">//</span><span style="color: #008000"> so reschedule another compaction if needed.</span>
- <span style="color: #000000"> MaybeScheduleCompaction();
- bg_cv_.SignalAll();
- }</span>
DBImpl::BackgroundCall()调用 BackgroundCompaction(),在BackgroundCompaction()中分别完成三种不同的Compaction:对Memtable进行合并、 trivial Compaction(直接将文件移动到下一层)以及一般的合并,调用DoCompactionWork()实现.
- <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::BackgroundCompaction() {
- mutex_.AssertHeld();
- </span><span style="color: #0000ff">if</span> (imm_ !=<span style="color: #000000"> NULL) {
- CompactMemTable();</span><span style="color: #008000">//</span><span style="color: #008000">1、对Memtable进行合并 </span>
- <span style="color: #0000ff">return</span><span style="color: #000000">;
- }
- Compaction</span>*<span style="color: #000000"> c;
- </span><span style="color: #0000ff">bool</span> is_manual = (manual_compaction_ != NULL);<span style="color: #008000">//</span><span style="color: #008000">manual_compaction默认为NULL,则is_manual默认为false </span>
- <span style="color: #000000"> InternalKey manual_end;
- </span><span style="color: #0000ff">if</span> (is_manual) { <span style="color: #008000">//</span><span style="color: #008000">取得手动compaction对象</span>
- ManualCompaction* m =<span style="color: #000000"> manual_compaction_;
- c </span>= versions_->CompactRange(m->level, m->begin, m-><span style="color: #000000">end);
- m</span>->done = (c ==<span style="color: #000000"> NULL);
- </span><span style="color: #0000ff">if</span> (c !=<span style="color: #000000"> NULL) {
- manual_end </span>= c->input(<span style="color: #800080">0</span>, c->num_input_files(<span style="color: #800080">0</span>) - <span style="color: #800080">1</span>)-><span style="color: #000000">largest;
- }
- Log(options_.info_log,
- </span><span style="color: #800000">"</span><span style="color: #800000">Manual compaction at level-%d from %s .. %s; will stop at %s\n</span><span style="color: #800000">"</span><span style="color: #000000">,
- m</span>-><span style="color: #000000">level,
- (m</span>->begin ? m->begin->DebugString().c_str() : <span style="color: #800000">"</span><span style="color: #800000">(begin)</span><span style="color: #800000">"</span><span style="color: #000000">),
- (m</span>->end ? m->end->DebugString().c_str() : <span style="color: #800000">"</span><span style="color: #800000">(end)</span><span style="color: #800000">"</span><span style="color: #000000">),
- (m</span>->done ? <span style="color: #800000">"</span><span style="color: #800000">(end)</span><span style="color: #800000">"</span><span style="color: #000000"> : manual_end.DebugString().c_str()));
- } </span><span style="color: #0000ff">else</span> { <span style="color: #008000">//</span><span style="color: #008000">取得自动compaction对象</span>
- c = versions_-><span style="color: #000000">PickCompaction();
- }
- Status status;
- </span><span style="color: #0000ff">if</span> (c ==<span style="color: #000000"> NULL) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Nothing to do</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!is_manual && c->IsTrivialMove()) {<span style="color: #008000">//</span><span style="color: #008000">2、IsTrivialMove 返回 True,trivial Compaction,则直接将文件移入 level + 1 层即可
- </span><span style="color: #008000">//</span><span style="color: #008000"> Move file to next level</span>
- assert(c->num_input_files(<span style="color: #800080">0</span>) == <span style="color: #800080">1</span><span style="color: #000000">);
- FileMetaData</span>* f = c->input(<span style="color: #800080">0</span>, <span style="color: #800080">0</span><span style="color: #000000">);
- c</span>->edit()->DeleteFile(c->level(), f-><span style="color: #000000">number);
- c</span>->edit()->AddFile(c->level() + <span style="color: #800080">1</span>, f->number, f-><span style="color: #000000">file_size,
- f</span>->smallest, f-><span style="color: #000000">largest);
- status </span>= versions_->LogAndApply(c->edit(), &<span style="color: #000000">mutex_);
- </span><span style="color: #0000ff">if</span> (!<span style="color: #000000">status.ok()) {
- RecordBackgroundError(status);
- }
- VersionSet::LevelSummaryStorage tmp;
- Log(options_.info_log, </span><span style="color: #800000">"</span><span style="color: #800000">Moved #%lld to level-%d %lld bytes %s: %s\n</span><span style="color: #800000">"</span><span style="color: #000000">,
- static_cast</span><unsigned <span style="color: #0000ff">long</span> <span style="color: #0000ff">long</span>>(f-><span style="color: #000000">number),
- c</span>->level() + <span style="color: #800080">1</span><span style="color: #000000">,
- static_cast</span><unsigned <span style="color: #0000ff">long</span> <span style="color: #0000ff">long</span>>(f-><span style="color: #000000">file_size),
- status.ToString().c_str(),
- versions_</span>->LevelSummary(&<span style="color: #000000">tmp));
- } </span><span style="color: #0000ff">else</span> { <span style="color: #008000">//</span><span style="color: #008000">3、一般的合并 </span>
- CompactionState* compact = <span style="color: #0000ff">new</span><span style="color: #000000"> CompactionState(c);
- status </span>= DoCompactionWork(compact); <span style="color: #008000">//</span><span style="color: #008000">进行compaction</span>
- <span style="color: #0000ff">if</span> (!<span style="color: #000000">status.ok()) {
- RecordBackgroundError(status);
- }
- CleanupCompaction(compact);
- c</span>->ReleaseInputs(); <span style="color: #008000">//</span><span style="color: #008000"> input的文件引用计数减少1</span>
- DeleteObsoleteFiles(); <span style="color: #008000">//</span><span style="color: #008000">删除无用文件</span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">delete</span><span style="color: #000000"> c;
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (status.ok()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Done</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span><span style="color: #000000"> (shutting_down_.Acquire_Load()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Ignore compaction errors found during shutting down</span>
- } <span style="color: #0000ff">else</span><span style="color: #000000"> {
- Log(options_.info_log,
- </span><span style="color: #800000">"</span><span style="color: #800000">Compaction error: %s</span><span style="color: #800000">"</span><span style="color: #000000">, status.ToString().c_str());
- }
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (is_manual) {
- ManualCompaction</span>* m = manual_compaction_; <span style="color: #008000">//</span><span style="color: #008000">标记手动compaction任务完成</span>
- <span style="color: #0000ff">if</span> (!<span style="color: #000000">status.ok()) {
- m</span>->done = <span style="color: #0000ff">true</span><span style="color: #000000">;
- }
- </span><span style="color: #0000ff">if</span> (!m-><span style="color: #000000">done) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> We only compacted part of the requested range. Update *m
- </span><span style="color: #008000">//</span><span style="color: #008000"> to the range that is left to be compacted.</span>
- m->tmp_storage =<span style="color: #000000"> manual_end;
- m</span>->begin = &m-><span style="color: #000000">tmp_storage;
- }
- manual_compaction_ </span>=<span style="color: #000000"> NULL;
- }
- }</span>
首行mutex_.AssertHeld(),Mutex的AssertHeld函数实现默认为空,在很多函数的实现内有调用,其作用如下:
As you have observed it does nothing in the default implementation. The function seems to be a placeholder for checking whether a particular thread holds a mutex and optionally abort if it doesn’t. This would be equivalent to the normal asserts we use for variables but applied on mutexes.
I think the reason it is not implemented yet is we don’t have an equivalent light weight function to assert whether a thread holds a lock in pthread_mutex_t used in the default implementation. Some platforms which has that capability could fill this implementation as part of porting process. Searching online I did find some implementation for this function in the windows port of leveldb. I can see one way to implement it using a wrapper class over pthread_mutex_t and setting some sort of a thread id variable to indicate which thread(s) currently holds the mutex, but it will have to be carefully implemented given the race conditions that can arise.
Compaction首先检查imm_,及时将已写满的memtable写入磁盘sstable文件,对Memtable的合并,调用DBImpl::CompactMemTable()完成:
- <span style="color: #0000ff">void</span><span style="color: #000000"> DBImpl::CompactMemTable() {
- mutex_.AssertHeld();
- assert(imm_ </span>!= NULL);<span style="color: #008000">//</span><span style="color: #008000">imm_不能为空</span>
- <span style="color: #000000"> VersionEdit edit;
- Version</span>* <span style="color: #0000ff">base</span> = versions_-><span style="color: #000000">current();
- </span><span style="color: #0000ff">base</span>-><span style="color: #000000">Ref();
- Status s </span>= WriteLevel0Table(imm_, &edit, <span style="color: #0000ff">base</span>);<span style="color: #008000">//</span><span style="color: #008000">将Memtable转化为.sst文件,写入level0 sst table,并写入到edit中</span>
- <span style="color: #0000ff">base</span>-><span style="color: #000000">Unref();
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
- edit.SetPrevLogNumber(</span><span style="color: #800080">0</span><span style="color: #000000">);
- edit.SetLogNumber(logfile_number_); </span><span style="color: #008000">//</span><span style="color: #008000"> Earlier logs no longer needed</span>
- s = versions_->LogAndApply(&edit, &mutex_);<span style="color: #008000">//</span><span style="color: #008000">应用edit中记录的变化,来生成新的版本 </span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Commit to the new state</span>
- imm_-><span style="color: #000000">Unref();
- imm_ </span>=<span style="color: #000000"> NULL;
- has_imm_.Release_Store(NULL);
- DeleteObsoleteFiles();
- } </span><span style="color: #0000ff">else</span><span style="color: #000000"> {
- RecordBackgroundError(s);
- }
- }</span>
其中CompactMemTable()主要调用了两个函数:WriteLevel0Table()和versions_->LogAndApply()
CompactMemTable()首先调用WriteLevel0Table(),源码内容如下:
- Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit*<span style="color: #000000"> edit,
- Version</span>* <span style="color: #0000ff">base</span><span style="color: #000000">) {
- mutex_.AssertHeld();
- FileMetaData meta;
- meta.number </span>= versions_->NewFileNumber();<span style="color: #008000">//</span><span style="color: #008000">获取新生成的.sst文件的编号</span>
- <span style="color: #000000"> pending_outputs_.insert(meta.number);
- Iterator</span>* iter = mem->NewIterator();<span style="color: #008000">//</span><span style="color: #008000">用于遍历Memtable中的数据</span>
- <span style="color: #000000">
- Status s;
- {
- mutex_.Unlock();
- s </span>= BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);<span style="color: #008000">//</span><span style="color: #008000">创建.sst文件,并将其相关信息记录在meta中</span>
- <span style="color: #000000"> mutex_.Lock();
- }
- </span><span style="color: #0000ff">delete</span> iter; <span style="color: #008000">//</span><span style="color: #008000">iter用完之后一定要删除</span>
- <span style="color: #000000"> pending_outputs_.erase(meta.number);
- </span><span style="color: #0000ff">int</span> level = <span style="color: #800080">0</span><span style="color: #000000">;
- </span><span style="color: #0000ff">if</span> (s.ok() && meta.file_size > <span style="color: #800080">0</span><span style="color: #000000">) {
- </span><span style="color: #0000ff">const</span> Slice min_user_key =<span style="color: #000000"> meta.smallest.user_key();
- </span><span style="color: #0000ff">const</span> Slice max_user_key =<span style="color: #000000"> meta.largest.user_key();
- </span><span style="color: #0000ff">if</span> (<span style="color: #0000ff">base</span> !=<span style="color: #000000"> NULL) {
- level </span>= <span style="color: #0000ff">base</span>->PickLevelForMemTableOutput(min_user_key, max_user_key);<span style="color: #008000">//</span><span style="color: #008000">为合并的输出文件选择合适的level</span>
- <span style="color: #000000"> }
- edit</span>->AddFile(level, meta.number, meta.file_size,meta.smallest, meta.largest);<span style="color: #008000">//</span><span style="color: #008000">将生成的.sst文件加入到该level</span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">return</span><span style="color: #000000"> s;
- }</span>
WriteLevel0Table()首先调用BuildTable()将Immutable Memtable中所有的数据写入到一个.sst文件中,并将.sst文件的信息(文件编号,Key值范围,文件大小)记录到变量meta中.由于Memtable是基于Skiplist的,是一个有序表,因此在写入.sst文件时,Key值也是从小到大来排列的.可以发现,将Memtable中的数据转换为SSTable时,是将所有记录都写入SSTable的,要删除的记录也一样.删除操作会在更高level的Compaction中完成.因此level 0中可能会存在Key值相同的记录.2. 然后调用PickLevelForMemTableOutput()为Memtable合并的输出文件选择合适的level,并调用edit->AddFile()将生成的.sst文件加入到该level中.
然后CompactMemTable()调用db文件夹下version_set.cc文件中的versions_->LogAndApply()基于当前版本和更改edit来得到一个新版本.
由之前的分析可知,is_manual默认为false,会调用PickCompaction()来选出要进行合并的level和相应的输入文件.当c->IsTrivialMove()满足时,则直接将文件移动到下一level.
- c = versions_-><span style="color: #000000">PickCompaction();
- Status status;
- </span><span style="color: #0000ff">if</span> (c ==<span style="color: #000000"> NULL) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Nothing to do</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (!is_manual && c-><span style="color: #000000">IsTrivialMove()) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> Move file to next level</span>
- assert(c->num_input_files(<span style="color: #800080">0</span>) == <span style="color: #800080">1</span><span style="color: #000000">);
- FileMetaData</span>* f = c->input(<span style="color: #800080">0</span>, <span style="color: #800080">0</span><span style="color: #000000">);
- c</span>->edit()->DeleteFile(c->level(), f->number); <span style="color: #008000">//</span><span style="color: #008000">将文件从该层删除</span>
- c->edit()->AddFile(c->level() + <span style="color: #800080">1</span>, f->number, f->file_size, <span style="color: #008000">//</span><span style="color: #008000">将该文件加入到下一level</span>
- f->smallest, f-><span style="color: #000000">largest);
- status </span>= versions_->LogAndApply(c->edit(), &mutex_); <span style="color: #008000">//</span><span style="color: #008000">应用更改,创建新的Version</span>
- }
首先调用db文件夹下version_set.cc文件中的VersionSet::PickCompaction()为接下来的Compaction操作准备输入数据,由之前对Compaction的数据结构分析可知,Compaction操作有两种触发方式:某一level的文件数太多和某一文件的查找次数超过允许值,在进行合并时,将优先考虑文件数过多的情况.
- Compaction*<span style="color: #000000"> VersionSet::PickCompaction() {
- Compaction</span>*<span style="color: #000000"> c;
- </span><span style="color: #0000ff">int</span><span style="color: #000000"> level;
- </span><span style="color: #0000ff">const</span> <span style="color: #0000ff">bool</span> size_compaction = (current_->compaction_score_ >= <span style="color: #800080">1</span>);<span style="color: #008000">//</span><span style="color: #008000">文件数过多</span>
- <span style="color: #0000ff">const</span> <span style="color: #0000ff">bool</span> seek_compaction = (current_->file_to_compact_ != NULL);<span style="color: #008000">//</span><span style="color: #008000">某一文件的查找次数太多</span>
- <span style="color: #0000ff">if</span> (size_compaction) {<span style="color: #008000">//</span><span style="color: #008000">文件数太多优先考虑</span>
- level = current_->compaction_level_; <span style="color: #008000">//</span><span style="color: #008000">要进行Compaction的level</span>
- c = <span style="color: #0000ff">new</span><span style="color: #000000"> Compaction(level);
- </span><span style="color: #008000">//</span><span style="color: #008000">每一层有一个compact_pointer,用于记录compaction key,这样可以进行循环compaction</span>
- <span style="color: #0000ff">for</span> (size_t i = <span style="color: #800080">0</span>; i < current_->files_[level].size(); i++) { <span style="color: #008000">//</span><span style="color: #008000">从待合并的level中选择合适的文件完成合并操作</span>
- FileMetaData* f = current_->files_[level][i]; <span style="color: #008000">//</span><span style="color: #008000">level层中的第i个文件</span>
- <span style="color: #0000ff">if</span> (compact_pointer_[level].empty() || <span style="color: #008000">//</span><span style="color: #008000">compact_pointer_中记录的是下次合并的起始Key值,为空时都可以进行合并</span>
- icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > <span style="color: #800080">0</span>) { <span style="color: #008000">//</span><span style="color: #008000">或者f的最大Key值大于起始值</span>
- c->inputs_[<span style="color: #800080">0</span>].push_back(f);<span style="color: #008000">//</span><span style="color: #008000">则该文件可以参与合并,将其加入到level输入文件中</span>
- <span style="color: #0000ff">break</span><span style="color: #000000">;
- }
- }
- </span><span style="color: #0000ff">if</span> (c->inputs_[<span style="color: #800080">0</span>].empty()) { <span style="color: #008000">//</span><span style="color: #008000">若level输入为空,则将level的第一个文件加入到输入中</span>
- c->inputs_[<span style="color: #800080">0</span>].push_back(current_->files_[level][<span style="color: #800080">0</span><span style="color: #000000">]);
- }
- } </span><span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (seek_compaction) {<span style="color: #008000">//</span><span style="color: #008000">然后考虑查找次数过多的情况</span>
- level = current_-><span style="color: #000000">file_to_compact_level_;
- c </span>= <span style="color: #0000ff">new</span><span style="color: #000000"> Compaction(level);
- c</span>->inputs_[<span style="color: #800080">0</span>].push_back(current_->file_to_compact_);<span style="color: #008000">//</span><span style="color: #008000">将待合并的文件作为level层的输入</span>
- } <span style="color: #0000ff">else</span><span style="color: #000000"> {
- </span><span style="color: #0000ff">return</span><span style="color: #000000"> NULL;
- }
- c</span>->input_version_ =<span style="color: #000000"> current_;
- c</span>->input_version_-><span style="color: #000000">Ref();
- </span><span style="color: #008000">//</span><span style="color: #008000">level 0中的Key值是可以重复的,因此Key值范围可能相互覆盖,把所有重叠都找出来,一起做compaction</span>
- <span style="color: #0000ff">if</span> (level == <span style="color: #800080">0</span><span style="color: #000000">) {
- InternalKey smallest, largest;
- GetRange(c</span>->inputs_[<span style="color: #800080">0</span>], &smallest, &largest);<span style="color: #008000">//</span><span style="color: #008000">待合并的level层的文件的Key值范围</span>
- current_->GetOverlappingInputs(<span style="color: #800080">0</span>, &smallest, &largest, &c->inputs_[<span style="color: #800080">0</span><span style="color: #000000">]);
- assert(</span>!c->inputs_[<span style="color: #800080">0</span><span style="color: #000000">].empty());
- }
- SetupOtherInputs(c);</span><span style="color: #008000">//</span><span style="color: #008000">获取待合并的level+1层的输入</span>
- <span style="color: #0000ff">return</span><span style="color: #000000"> c;
- }</span>
然后判断是否为trivial Compaction,当为trivial Compaction时,只需要简单的将level层的文件移动到level +1 层即可
- <span style="color: #0000ff">bool</span> Compaction::IsTrivialMove() <span style="color: #0000ff">const</span><span style="color: #000000"> {
- </span><span style="color: #0000ff">return</span> (num_input_files(<span style="color: #800080">0</span>) == <span style="color: #800080">1</span> && <span style="color: #008000">//</span><span style="color: #008000">level层只有1个文件</span>
- num_input_files(<span style="color: #800080">1</span>) == <span style="color: #800080">0</span> && <span style="color: #008000">//</span><span style="color: #008000">level+1层没有文件</span>
- TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);<span style="color: #008000">//</span><span style="color: #008000">level+2层文件总大小不超过最大覆盖范围,否则会导致后面的merge需要很大的开销</span>
- }
最终完成完成Compaction操作
- c->edit()->DeleteFile(c->level(), f-><span style="color: #000000">number);
- c</span>->edit()->AddFile(c->level() + <span style="color: #800080">1</span>, f->number, f->file_size,f->smallest, f-><span style="color: #000000">largest);
- status </span>= versions_->LogAndApply(c->edit(), &mutex_);
一般的合并调用DBImpl::DoCompactionWork()完成,compact是调用VersionSet::PickCompacttion()得到的,与之前的trivial Compaction相同.不同level之间,可能存在Key值相同的记录,但是记录的seq不同.由之前的分析可知,最新的数据存放在较低的level中,其对应的seq也一定比level+1中的记录的seq要大,因此当出现相同Key值的记录时,只需要记录第一条记录,后面的都可以丢弃.level 0中也可能存在Key值相同的数据,其后面的seq也不同.数据越新,其对应的seq越大,且记录在level 0中的记录是按照user_key递增,seq递减的方式存储的,则相同user_key对应的记录是聚集在一起的,且按照seq递减的方式存放的.在更高层的Compaction时,只需要处理第一条出现的user_key相同的记录即可,后面的相同user_key的记录都可以丢弃.因此合并后的level +1层的文件中不会存在Key值相同的记录.删除记录的操作也会在此时完成,删除数据的记录会被丢弃,而不会被写入到更高level的文件中.
- Status DBImpl::DoCompactionWork(CompactionState*<span style="color: #000000"> compact) {
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (snapshots_.empty()) {
- compact</span>->smallest_snapshot = versions_-><span style="color: #000000">LastSequence();
- } </span><span style="color: #0000ff">else</span><span style="color: #000000"> {
- compact</span>->smallest_snapshot = snapshots_.oldest()-><span style="color: #000000">number_;
- }
- mutex_.Unlock();
- </span><span style="color: #008000">//</span><span style="color: #008000">生成iterator:遍历要compaction的数据</span>
- Iterator* input = versions_->MakeInputIterator(compact->compaction);<span style="color: #008000">//</span><span style="color: #008000">用于遍历待合并的每一个文件</span>
- input-><span style="color: #000000">SeekToFirst();
- Status status;
- ParsedInternalKey ikey;
- std::</span><span style="color: #0000ff">string</span><span style="color: #000000"> current_user_key;
- </span><span style="color: #0000ff">bool</span> has_current_user_key = <span style="color: #0000ff">false</span><span style="color: #000000">;
- SequenceNumber last_sequence_for_key </span>=<span style="color: #000000"> kMaxSequenceNumber;
- </span><span style="color: #0000ff">for</span> (; input->Valid() && !<span style="color: #000000">shutting_down_.Acquire_Load(); ) {
- </span><span style="color: #0000ff">if</span> (has_imm_.NoBarrier_Load() != NULL) { <span style="color: #008000">//</span><span style="color: #008000">immutable memtable的优先级最高</span>
- <span style="color: #000000"> mutex_.Lock();
- </span><span style="color: #0000ff">if</span> (imm_ != NULL) { <span style="color: #008000">//</span><span style="color: #008000">当imm_非空时,合并Memtable</span>
- <span style="color: #000000"> CompactMemTable();
- bg_cv_.SignalAll(); </span><span style="color: #008000">//</span><span style="color: #008000"> Wakeup MakeRoomForWrite() if necessary</span>
- <span style="color: #000000"> }
- mutex_.Unlock();
- }
- Slice key </span>= input-><span style="color: #000000">key();
- </span><span style="color: #0000ff">if</span> (compact->compaction->ShouldStopBefore(key) && <span style="color: #008000">//</span><span style="color: #008000">是否需要停止Compaction,中途输出compaction的结果,避免compaction结果和level N+2 files有过多的重叠</span>
- compact->builder !=<span style="color: #000000"> NULL) {
- status </span>=<span style="color: #000000"> FinishCompactionOutputFile(compact, input);
- }
- </span><span style="color: #0000ff">bool</span> drop = <span style="color: #0000ff">false</span><span style="color: #000000">;
- </span><span style="color: #0000ff">if</span> (!ParseInternalKey(key, &<span style="color: #000000">ikey)) {
- current_user_key.clear();
- has_current_user_key </span>= <span style="color: #0000ff">false</span><span style="color: #000000">;
- last_sequence_for_key </span>=<span style="color: #000000"> kMaxSequenceNumber;
- } </span><span style="color: #0000ff">else</span><span style="color: #000000"> {
- </span><span style="color: #0000ff">if</span> (!has_current_user_key || <span style="color: #008000">//</span><span style="color: #008000">获取当前的user_key和sequence</span>
- user_comparator()-><span style="color: #000000">Compare(ikey.user_key,
- Slice(current_user_key)) </span>!= <span style="color: #800080">0</span>) { <span style="color: #008000">//</span><span style="color: #008000">可能存在Key值相同但seq不同的记录
- </span><span style="color: #008000">//</span><span style="color: #008000"> 此时是这个Key第一次出现</span>
- <span style="color: #000000"> current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
- has_current_user_key </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
- last_sequence_for_key </span>= kMaxSequenceNumber;<span style="color: #008000">//</span><span style="color: #008000">则将其seq设为最大值,表示第一次出现</span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">if</span> (last_sequence_for_key <= compact->smallest_snapshot) {<span style="color: #008000">//</span><span style="color: #008000">表示key已经出现过,否则seq应为KMaxSequenceNumber</span>
- drop = <span style="color: #0000ff">true</span>; <span style="color: #008000">//</span><span style="color: #008000"> (A) </span><span style="color: #008000">//</span><span style="color: #008000">之前已经存在Key值相同的记录,丢弃</span>
- } <span style="color: #0000ff">else</span> <span style="color: #0000ff">if</span> (ikey.type == kTypeDeletion && <span style="color: #008000">//</span><span style="color: #008000">要删除该记录</span>
- ikey.sequence <= compact->smallest_snapshot && <span style="color: #008000">//</span><span style="color: #008000">记录的序号比数据库之前的最小序号还小</span>
- compact->compaction->IsBaseLevelForKey(ikey.user_key)) { <span style="color: #008000">//</span><span style="color: #008000">高的level中没有数据</span>
- drop = <span style="color: #0000ff">true</span>; <span style="color: #008000">//</span><span style="color: #008000">此时要丢弃该记录</span>
- <span style="color: #000000"> }
- last_sequence_for_key </span>= ikey.sequence;<span style="color: #008000">//</span><span style="color: #008000">上次出现的记录对应的sequence,用于判断后面出现相同Key值的情况</span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">if</span> (!drop) { <span style="color: #008000">//</span><span style="color: #008000">如果不需要丢弃该记录</span>
- <span style="color: #0000ff">if</span> (compact->builder ==<span style="color: #000000"> NULL) {
- status </span>= OpenCompactionOutputFile(compact);<span style="color: #008000">//</span><span style="color: #008000">若需要,则创建一个.sst文件,用于存放合并后的数据</span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">if</span> (compact->builder->NumEntries() == <span style="color: #800080">0</span><span style="color: #000000">) {
- compact</span>->current_output()-><span style="color: #000000">smallest.DecodeFrom(key);
- }
- compact</span>->current_output()-><span style="color: #000000">largest.DecodeFrom(key);
- compact</span>->builder->Add(key, input->value());<span style="color: #008000">//</span><span style="color: #008000">将记录写入.sst文件</span>
- <span style="color: #0000ff">if</span> (compact->builder->FileSize() >=<span style="color: #000000">
- compact</span>->compaction->MaxOutputFileSize()) { <span style="color: #008000">//</span><span style="color: #008000">当.sst文件超过最大值时</span>
- status = FinishCompactionOutputFile(compact, input);<span style="color: #008000">//</span><span style="color: #008000">完成Compaction输出文件</span>
- <span style="color: #000000"> }
- }
- input</span>->Next(); <span style="color: #008000">//</span><span style="color: #008000">处理下一个文件</span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">if</span> (status.ok() && compact->builder !=<span style="color: #000000"> NULL) {
- status </span>=<span style="color: #000000"> FinishCompactionOutputFile(compact, input);
- }
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (status.ok()) {
- status </span>= input-><span style="color: #000000">status();
- }
- </span><span style="color: #0000ff">delete</span><span style="color: #000000"> input;
- input </span>=<span style="color: #000000"> NULL;
- </span><span style="color: #008000">//</span><span style="color: #008000">更新compaction的一些统计数据</span>
- <span style="color: #000000"> CompactionStats stats;
- stats.micros </span>= env_->NowMicros() - start_micros -<span style="color: #000000"> imm_micros;
- </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> which = <span style="color: #800080">0</span>; which < <span style="color: #800080">2</span>; which++<span style="color: #000000">) {
- </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> i = <span style="color: #800080">0</span>; i < compact->compaction->num_input_files(which); i++<span style="color: #000000">) {
- stats.bytes_read </span>+= compact->compaction->input(which, i)-><span style="color: #000000">file_size;
- }
- }
- </span><span style="color: #0000ff">for</span> (size_t i = <span style="color: #800080">0</span>; i < compact->outputs.size(); i++<span style="color: #000000">) {
- stats.bytes_written </span>+= compact-><span style="color: #000000">outputs[i].file_size;
- }
- mutex_.Lock();
- stats_[compact</span>->compaction->level() + <span style="color: #800080">1</span><span style="color: #000000">].Add(stats);
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (status.ok()) {
- status </span>= InstallCompactionResults(compact);<span style="color: #008000">//</span><span style="color: #008000">完成合并</span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">if</span> (!<span style="color: #000000">status.ok()) {
- RecordBackgroundError(status);
- }
- VersionSet::LevelSummaryStorage tmp;
- Log(options_.info_log,
- </span><span style="color: #800000">"</span><span style="color: #800000">compacted to: %s</span><span style="color: #800000">"</span>, versions_->LevelSummary(&<span style="color: #000000">tmp));
- </span><span style="color: #0000ff">return</span><span style="color: #000000"> status;
- }</span>
首先将可以留下的记录写入到.sst文件中,并将相关信息保存在变量compact中,然后调用InstallCompactionResults()将所做的改动加入到VersionEdit中,再调用LogAndApply()来得到新的版本.
在上面三种不同的Compaction操作中,最终当对当前版本的更改VersionEdit全部完成后,都会调用VersionSet::LogAndApply()来应用更改,创建新版本.edit中保存了level和level+1层要删除和增加的文件.
- Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex*<span style="color: #000000"> mu) {
- Version</span>* v = <span style="color: #0000ff">new</span> Version(<span style="color: #0000ff">this</span>); <span style="color: #008000">//</span><span style="color: #008000">创建一个新Version</span>
- <span style="color: #000000"> {
- Builder builder(</span><span style="color: #0000ff">this</span>, current_);<span style="color: #008000">//</span><span style="color: #008000">基于当前Version创建一个builder变量</span>
- builder.Apply(edit);<span style="color: #008000">//</span><span style="color: #008000">将edit中记录的要增加、删除的文件加入到builder类中</span>
- builder.SaveTo(v);<span style="color: #008000">//</span><span style="color: #008000">然后将edit中的记录保存到新创建的Version中,这样就得到了一个新的版本</span>
- <span style="color: #000000"> }
- Finalize(v);</span><span style="color: #008000">//</span><span style="color: #008000">根据各层文件数来判断是否还需要进行Compaction</span>
- <span style="color: #000000">
- std::</span><span style="color: #0000ff">string</span><span style="color: #000000"> new_manifest_file;
- Status s;
- </span><span style="color: #0000ff">if</span> (descriptor_log_ == NULL) { <span style="color: #008000">//</span><span style="color: #008000">只会在第一次调用时进入</span>
- assert(descriptor_file_ ==<span style="color: #000000"> NULL);
- new_manifest_file </span>= DescriptorFileName(dbname_, manifest_file_number_);<span style="color: #008000">//</span><span style="color: #008000">创建一个新的Manifest文件</span>
- edit-><span style="color: #000000">SetNextFile(next_file_number_);
- s </span>= env_->NewWritableFile(new_manifest_file, &<span style="color: #000000">descriptor_file_);
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
- descriptor_log_ </span>= <span style="color: #0000ff">new</span><span style="color: #000000"> log::Writer(descriptor_file_);
- s </span>= WriteSnapshot(descriptor_log_);<span style="color: #008000">//</span><span style="color: #008000">快照,系统开始时完整记录数据库的所有信息</span>
- <span style="color: #000000"> }
- }
- {
- mu</span>-><span style="color: #000000">Unlock();
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
- std::</span><span style="color: #0000ff">string</span><span style="color: #000000"> record;
- edit</span>->EncodeTo(&<span style="color: #000000">record);
- s </span>= descriptor_log_->AddRecord(record);<span style="color: #008000">//</span><span style="color: #008000">将数据库的变化记录到Manifest文件中</span>
- <span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
- s </span>= descriptor_file_-><span style="color: #000000">Sync();
- }
- }
- </span><span style="color: #0000ff">if</span> (s.ok() && !<span style="color: #000000">new_manifest_file.empty()) {
- s </span>=<span style="color: #000000"> SetCurrentFile(env_, dbname_, manifest_file_number_);
- }
- mu</span>-><span style="color: #000000">Lock();
- }
- </span><span style="color: #0000ff">if</span><span style="color: #000000"> (s.ok()) {
- AppendVersion(v); </span><span style="color: #008000">//</span><span style="color: #008000">将新得到的Version插入到所有Version形成的双向链表的尾部</span>
- log_number_ = edit-><span style="color: #000000">log_number_;
- prev_log_number_ </span>= edit-><span style="color: #000000">prev_log_number_;
- }
- }
- </span><span style="color: #0000ff">return</span><span style="color: #000000"> s;
- }</span>
为了重启之后能恢复数据库之前的状态,就需要将数据库的历史变化信息记录下来,这些信息都是记录在Manifest文件中的.为了节省空间和时间,leveldb采用的是在系统开始完整的所有数据库的信息(WriteSnapShot()),以后则只记录数据库的变化,即VersionEdit中的信息(descriptor_log_->AddRecord()).恢复时,只需要根据Manifest中的信息就可以一步步的恢复到上次的状态.
VersionSet::LogAndApply首先创建一个新的Version,然后调用builder.Apply(edit)将edit中所有要删除、增加的文件编号记录下来,其源码如下:
- <span style="color: #008000">//</span><span style="color: #008000"> Apply all of the edits in *edit to the current state.</span>
- <span style="color: #0000ff">void</span> Apply(VersionEdit*<span style="color: #000000"> edit) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> 更新每一层下次合并的起始Key值</span>
- <span style="color: #0000ff">for</span> (size_t i = <span style="color: #800080">0</span>; i < edit->compact_pointers_.size(); i++<span style="color: #000000">) {
- </span><span style="color: #0000ff">const</span> <span style="color: #0000ff">int</span> level = edit-><span style="color: #000000">compact_pointers_[i].first;
- vset_</span>->compact_pointer_[level] =<span style="color: #000000">
- edit</span>-><span style="color: #000000">compact_pointers_[i].second.Encode().ToString();
- }
- </span><span style="color: #008000">//</span><span style="color: #008000">将所有要删除的文件加入到levels_[level].deleted_files变量中</span>
- <span style="color: #0000ff">const</span> VersionEdit::DeletedFileSet& del = edit-><span style="color: #000000">deleted_files_;
- </span><span style="color: #0000ff">for</span> (VersionEdit::DeletedFileSet::const_iterator iter =<span style="color: #000000"> del.begin();
- iter </span>!= del.end();++<span style="color: #000000">iter) {
- </span><span style="color: #0000ff">const</span> <span style="color: #0000ff">int</span> level = iter-><span style="color: #000000">first;
- </span><span style="color: #0000ff">const</span> uint64_t number = iter-><span style="color: #000000">second;
- levels_[level].deleted_files.insert(number);
- }
- </span><span style="color: #008000">//</span><span style="color: #008000"> 将所有新增加的文件加入到levels_[level].added_files中</span>
- <span style="color: #0000ff">for</span> (size_t i = <span style="color: #800080">0</span>; i < edit->new_files_.size(); i++<span style="color: #000000">) {
- </span><span style="color: #0000ff">const</span> <span style="color: #0000ff">int</span> level = edit-><span style="color: #000000">new_files_[i].first;
- FileMetaData</span>* f = <span style="color: #0000ff">new</span> FileMetaData(edit-><span style="color: #000000">new_files_[i].second);
- f</span>->refs = <span style="color: #800080">1</span><span style="color: #000000">;
- f</span>->allowed_seeks = (f->file_size / <span style="color: #800080">16384</span><span style="color: #000000">);
- </span><span style="color: #0000ff">if</span> (f->allowed_seeks < <span style="color: #800080">100</span>) f->allowed_seeks = <span style="color: #800080">100</span><span style="color: #000000">;
- levels_[level].deleted_files.erase(f</span>-><span style="color: #000000">number);
- levels_[level].added_files</span>-><span style="color: #000000">insert(f);
- }
- }</span>
然后VersionSet::LogAndApply再调用builder.SaveTo(v)将更改保存到新的Version中,其源码如下:
- <span style="color: #0000ff">void</span> SaveTo(Version*<span style="color: #000000"> v) {
- BySmallestKey cmp;
- cmp.internal_comparator </span>= &vset_-><span style="color: #000000">icmp_;
- </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> level = <span style="color: #800080">0</span>; level < config::kNumLevels; level++<span style="color: #000000">) {
- </span><span style="color: #0000ff">const</span> std::vector<FileMetaData*>& base_files = base_->files_[level];<span style="color: #008000">//</span><span style="color: #008000">当前Version中原有的各个level的.sst文件</span>
- std::vector<FileMetaData*>::const_iterator base_iter =<span style="color: #000000"> base_files.begin();
- std::vector</span><FileMetaData*>::const_iterator base_end =<span style="color: #000000"> base_files.end();
- </span><span style="color: #0000ff">const</span> FileSet* added = levels_[level].added_files;<span style="color: #008000">//</span><span style="color: #008000">对应level新增加的文件</span>
- v->files_[level].reserve(base_files.size() + added-><span style="color: #000000">size());
- </span><span style="color: #0000ff">for</span> (FileSet::const_iterator added_iter = added-><span style="color: #000000">begin();
- added_iter </span>!= added->end();++<span style="color: #000000">added_iter) {
- </span><span style="color: #008000">//</span><span style="color: #008000"> 将原有文件中编号比added小的加入到新的Version</span>
- <span style="color: #0000ff">for</span> (std::vector<FileMetaData*><span style="color: #000000">::const_iterator bpos
- </span>= std::upper_bound(base_iter, base_end, *<span style="color: #000000">added_iter, cmp);
- base_iter </span>!= bpos;++<span style="color: #000000">base_iter) {
- MaybeAddFile(v, level, </span>*<span style="color: #000000">base_iter);
- }
- MaybeAddFile(v, level, </span>*added_iter);<span style="color: #008000">//</span><span style="color: #008000">再将新增的文件依次加入到新的Version</span>
- <span style="color: #000000"> }
- </span><span style="color: #0000ff">for</span> (; base_iter != base_end; ++<span style="color: #000000">base_iter) {
- MaybeAddFile(v, level, </span>*base_iter);<span style="color: #008000">//</span><span style="color: #008000">再将原有文件中剩余的部分加入到新的Version</span>
- <span style="color: #000000"> }
- }
- }</span>
bpos = std::upper_bound(base_iter,base_end,*added_iter,cmp); // 返回base_iter到base_end之间,第一个大于*added_iter的iter.假设原有文件的编号为1、3、4、6、8,新增文件的编号为2、5、7,则第一次循环时,bpos为3对应的迭代器,因此base_iter只遍历一个元素,即将编号1加入到新的Version中.总体对新增文件来说,就是首先加入base中编号比它小的,然后再将其加入,然后再继续比那里下一个新增文件,因此最终得到的文件编号顺序是 1、2、3、4、5、6、7、8,即每一层的.sst文件都是按照编号从小到大排列的.这样就得到了新的Version的每一层的所有文件.
参考文献:
1.http://blog.csdn.net/u012658346/article/details/45787233
2.http://blog.csdn.net/u012658346/article/details/45788939
3.http://blog.csdn.net/joeyon1985/article/details/47154249
4.http://www.blogjava.net/sandy/archive/2012/03/15/leveldb6.html
5.http://www.pandademo.com/2016/04/compaction-of-sstable-leveldb-part-1-source-dissect-9/
LevelDB的源码阅读(四) Compaction操作
标签:一个 check sdn ras sandy dad delete 分析 mmu