时间:2021-07-01 10:21:17 帮助过:12人阅读
DBImpl::RecoverLogFile()
该函数打开指定的log文件,回放日志,用于恢复db。期间可能会执行compaction,生产新的level 0sstable文件,记录文件变动到edit中。
它声明了一个局部类LogReporter以打印错误日志
1 Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, 2 bool* save_manifest, VersionEdit* edit, 3 SequenceNumber* max_sequence) { 4 5 mutex_.AssertHeld(); 6 7 //打开指定的log文件 8 std::string fname = LogFileName(dbname_, log_number); 9 SequentialFile* file; 10 Status status = env_->NewSequentialFile(fname, &file); 11 if (!status.ok()) { 12 MaybeIgnoreError(&status); 13 return status; 14 } 15 16 // Create the log reader. 17 LogReporter reporter; 18 19 //根据log文件句柄file创建log::Reader,准备读取log。 20 log::Reader reader(file, &reporter, true/*checksum*/, 21 0/*initial_offset*/); 22 Log(options_.info_log, "Recovering log #%llu", 23 (unsigned long long) log_number); 24 25 // Read all the records and add to a memtable 26 std::string scratch; 27 Slice record; 28 WriteBatch batch; 29 int compactions = 0; 30 MemTable* mem = NULL; 31 //依次读取所有的log记录,并插入到新生成的memtable中。这里使用到了批量更新接口WriteBatch,具体后面再分析。 32 while (reader.ReadRecord(&record, &scratch) && 33 status.ok()) 34 { 35 if (record.size() < 12) {// log数据错误,不满足最小长度12 36 reporter.Corruption( 37 record.size(), Status::Corruption("log record too small")); 38 continue; 39 } 40 WriteBatchInternal::SetContents(&batch, record);// log内容设置到WriteBatch中,因为写日志时一条record就是一个WriteBatch的内容。 41 42 if (mem == NULL) { 43 mem = new MemTable(internal_comparator_); 44 mem->Ref(); 45 } 46 status = WriteBatchInternal::InsertInto(&batch, mem);// 插入到memtable中 47 MaybeIgnoreError(&status); 48 if (!status.ok()) { 49 break; 50 } 51 //恢复出kv对记录中最大的事务序列号 52 const SequenceNumber last_seq = 53 WriteBatchInternal::Sequence(&batch) + 54 WriteBatchInternal::Count(&batch) - 1; 55 if (last_seq > *max_sequence) { 56 *max_sequence = last_seq; 57 } 58 59 if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) 60 { 61 // 如果mem的内存超过设置值write_buffer_size,则执行compaction 62 compactions++; 63 *save_manifest = true; 64 //一旦生成sst便会记录在edit,save_manifest设为ture告诉调用这edit有新的数据 65 status = WriteLevel0Table(mem, edit, NULL); 66 mem->Unref(); 67 mem = NULL; 68 if (!status.ok()) { 69 // Reflect errors immediately so that conditions like full 70 // file-systems cause the DB::Open() to fail. 71 break; 72 } 73 } 74 } 75 76 delete file; 77 78 // See if we should keep reusing the last log file. 79 //是否应该继续使用上次剩下的日志文件 80 if (status.ok() && options_.reuse_logs && last_log && compactions == 0) { 81 //如果option设置可以以及是回放的最后一个日志文件以及回放过程中没有生成新的sst文件才行 82 assert(logfile_ == NULL); 83 assert(log_ == NULL); 84 assert(mem_ == NULL); 85 uint64_t lfile_size; 86 if (env_->GetFileSize(fname, &lfile_size).ok() && 87 env_->NewAppendableFile(fname, &logfile_).ok()) { 88 Log(options_.info_log, "Reusing old log %s \n", fname.c_str()); 89 log_ = new log::Writer(logfile_, lfile_size);//继续使用该日志文件 90 logfile_number_ = log_number; 91 if (mem != NULL) { 92 mem_ = mem;//如果刚才回放生成了memtable,就让db继续使用 93 mem = NULL; 94 } else { 95 // mem can be NULL if lognum exists but was empty. 96 //mem为空说明刚才的日志文件只是存在但没有存放记录。依然创建一个新的mem_ 97 mem_ = new MemTable(internal_comparator_); 98 mem_->Ref(); 99 } 100 } 101 } 102 103 if (mem != NULL) { 104 //扫尾工作,走到这说明不继续使用旧日志文件,因此需要mem刷到新的sstable文件中 105 if (status.ok()) { 106 *save_manifest = true; 107 status = WriteLevel0Table(mem, edit, NULL); 108 } 109 mem->Unref(); 110 } 111 112 return status; 113 }
VersionSet::Recover
当正常运行期间,每当调用LogAndApply的时候,都会将VersionEdit作为一笔记录,追加写入到MANIFEST文件。我们知道VersionEdit就是记录数据库的一个版本到另一个版本间的sst文件变化情况以及各层合并点变化情况。
注意,VersionEdit可以序列化,存进MANIFEST文件,同样道理,MANIFEST中可以将VersionEdit一个一个的重放出来。这个重放的目的,是为了得到当前的Version 以及VersionSet。
一般来讲,当打开的DB的时候,需要获得这种信息,而这种信息的获得,靠的就是所有VersionEdit 按照次序一一回放,生成当前的Version。
1 Status VersionSet::Recover(bool *save_manifest) { 2 // Read "CURRENT" file, which contains a pointer to the current manifest file 3 //读取"CURRENT"文件的内容到current,该文件内容包含了最新的Manifest文件名。 4 std::string current; 5 Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t); 6 if (!s.ok()) { 7 return s; 8 } 9 if (current.empty() || current[current.size()-1] != ‘\n‘) { 10 return Status::Corruption("CURRENT file does not end with newline"); 11 } 12 //去掉文件名最后的‘\n‘ 13 current.resize(current.size() - 1); 14 //获取完整的最新Manifest文件名 15 std::string dscname = dbname_ + "/" + current; 16 SequentialFile* file; 17 //打开该Manifest文件 18 s = env_->NewSequentialFile(dscname, &file); 19 if (!s.ok()) { 20 return s; 21 } 22 23 bool have_log_number = false; 24 bool have_prev_log_number = false; 25 bool have_next_file = false; 26 bool have_last_sequence = false; 27 uint64_t next_file = 0; 28 uint64_t last_sequence = 0; 29 uint64_t log_number = 0; 30 uint64_t prev_log_number = 0; 31 Builder builder(this, current_); 32 33 { 34 LogReporter reporter; 35 reporter.status = &s; 36 log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); 37 Slice record; 38 std::string scratch; 39 /*读取MANIFEST内容,MANIFEST是以log的方式写入的,因此这里调用的是log::Reader来读取。 40 然后调用VersionEdit::DecodeFrom,从内容解析出VersionEdit对象,并将VersionEdit记录的改动应用到versionset中。 41 读取MANIFEST中的log number, prev log number, nextfile number, last sequence。*/ 42 while (reader.ReadRecord(&record, &scratch) && s.ok()) { 43 VersionEdit edit; 44 s = edit.DecodeFrom(record); 45 if (s.ok()) { 46 //edit记录的user_comparator名一定要和当前打开数据库传入的user_comparator匹配 47 //否则会出错,因为原来数据库生成的sst文件是按user_comparator排序的 48 if (edit.has_comparator_ && 49 edit.comparator_ != icmp_.user_comparator()->Name()) { 50 s = Status::InvalidArgument( 51 edit.comparator_ + " does not match existing comparator ", 52 icmp_.user_comparator()->Name()); 53 } 54 } 55 /*按照次序,讲Verison的变化量层层回放,最重会得到最终版本的Version*/ 56 if (s.ok()) { 57 builder.Apply(&edit); 58 } 59 60 if (edit.has_log_number_) { 61 log_number = edit.log_number_; 62 have_log_number = true; 63 } 64 65 if (edit.has_prev_log_number_) { 66 prev_log_number = edit.prev_log_number_; 67 have_prev_log_number = true; 68 } 69 70 if (edit.has_next_file_number_) { 71 next_file = edit.next_file_number_; 72 have_next_file = true; 73 } 74 75 if (edit.has_last_sequence_) { 76 last_sequence = edit.last_sequence_; 77 have_last_sequence = true; 78 } 79 } 80 } 81 delete file; 82 file = NULL; 83 84 if (s.ok()) { 85 if (!have_next_file) { 86 s = Status::Corruption("no meta-nextfile entry in descriptor"); 87 } else if (!have_log_number) { 88 s = Status::Corruption("no meta-lognumber entry in descriptor"); 89 } else if (!have_last_sequence) { 90 s = Status::Corruption("no last-sequence-number entry in descriptor"); 91 } 92 93 if (!have_prev_log_number) { 94 prev_log_number = 0; 95 } 96 //将读取到的log number, prev log number标记为已使用。 97 MarkFileNumberUsed(prev_log_number); 98 MarkFileNumberUsed(log_number); 99 } 100 101 if (s.ok()) { 102 Version* v = new Version(this); 103 /*通过回放所有的VersionEdit,得到最终版本的Version,存入v*/ 104 builder.SaveTo(v); 105 // Install recovered version 106 Finalize(v); 107 AppendVersion(v); 108 manifest_file_number_ = next_file; 109 next_file_number_ = next_file + 1; 110 last_sequence_ = last_sequence; 111 log_number_ = log_number; 112 prev_log_number_ = prev_log_number; 113 114 // 我们是否需要创建新的MANIFEST 文件 115 /*随着时间的流逝,发生Compact的机会越来越多,Version跃升的次数越多,自然VersionEdit出现的 116 次数越来越多,而每一个VersionEdit都会记录到MANIFEST,这必然会造成MANIFEST文件不断变大。*/ 117 if (ReuseManifest(dscname, current)) { 118 // No need to save new manifest 119 } else { 120 *save_manifest = true; 121 } 122 } 123 124 return s; 125 }
完.
[leveldb] Recover
标签:expected printf point 持久化 back into enum 批量更新 个数