当前位置:Gxlcms > 数据库问题 > [leveldb]2.open/put/delete基本操作介绍

[leveldb]2.open/put/delete基本操作介绍

时间:2021-07-01 10:21:17 帮助过:10人阅读

Status DB::Open(const Options& options, const std::string& dbname, 2 DB** dbptr) { 3 *dbptr = NULL; 4 5 DBImpl* impl = new DBImpl(options, dbname); 6 impl->mutex_.Lock(); 7 VersionEdit edit; 8 // Recover handles create_if_missing, error_if_exists 9 bool save_manifest = false; 10 Status s = impl->Recover(&edit, &save_manifest); 11 if (s.ok() && impl->mem_ == NULL) { 12 // Create new log and a corresponding memtable. 13 uint64_t new_log_number = impl->versions_->NewFileNumber(); 14 WritableFile* lfile; 15 s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), 16 &lfile); 17 if (s.ok()) { 18 edit.SetLogNumber(new_log_number); 19 impl->logfile_ = lfile; 20 impl->logfile_number_ = new_log_number; 21 impl->log_ = new log::Writer(lfile); 22 impl->mem_ = new MemTable(impl->internal_comparator_); 23 impl->mem_->Ref(); 24 } 25 } 26 if (s.ok() && save_manifest) { 27 edit.SetPrevLogNumber(0); // No older logs needed after recovery. 28 edit.SetLogNumber(impl->logfile_number_); 29 s = impl->versions_->LogAndApply(&edit, &impl->mutex_); 30 } 31 if (s.ok()) { 32 impl->DeleteObsoleteFiles(); 33 impl->MaybeScheduleCompaction(); 34 } 35 impl->mutex_.Unlock(); 36 if (s.ok()) { 37 assert(impl->mem_ != NULL); 38 *dbptr = impl; 39 } else { 40 delete impl; 41 } 42 return s; 43 }

1、先通过impl->Recover(&edit, &save_manifest);恢复日志文件

2、通过NewWriteableFile创建新log文件

options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);

3、创建log和memtable

impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);

 

2、put/delete

插入:

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;        //leveldb中不管单个插入还是多个插入都是以WriteBatch的方式进行的
  batch.Put(key, value);
  return Write(opt, &batch);
}

Delete也类似,只是调用了WriteBatch 的 Delete(key), 这样再内部会以不同的形式编码传递至下一步进行处理。具体的WriteBatch的实现和编码方式在稍后的文章中进行介绍。Delete和Put都调用了Write,,这里的Write是在DBImpl::Write中通过虚函数的形式实现对其调用的,我们接着看Write的流程

 1 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
 2   Writer w(&mutex_);
 3   w.batch = my_batch;
 4   w.sync = options.sync;
 5   w.done = false;
 6     /* 
 7 产生一个Writer对象,然后保存必要的锁、batch、和同步写的相关信息
 8 */
 9   MutexLock l(&mutex_);
10   writers_.push_back(&w);   // 上锁,然后放入待写的队列中
11   while (!w.done && &w != writers_.front()) {
12 /* 这里设计比较特别,需要跟后面的 BuildBatchGroup结合起来看,这里大致的意思是
13    一直等待到这次写完成或者这次写被放在队列的最前面,BuildBatchGroup会将队列  
14    里所有sync设置相同的写请求组成一个WriteBatch进行写入,这里的写请求有可能在
15    别的线程完成写操作了,而是否在队列首的判断是有可能此刻没有其他线程在写循环中,
16    或者本次写请求和前面的写请求的同步设置不一致,那么这种情况就需要自己进入该线
17    程完成写的操作。
18 */
19     w.cv.Wait();
20   }
21   if (w.done) {
22     return w.status;
23   }
24 
25   // 这个函数的主要作用是清理内存表和外存(磁盘)的表使内存表腾出空间插入新的数据
26 // 这里的设计比较复杂设计到leveldb 的很多核心设计,我们这里先大致了解其功能
27   Status status = MakeRoomForWrite(my_batch == NULL);
28   uint64_t last_sequence = versions_->LastSequence();
29   Writer* last_writer = &w;
30   if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
31     WriteBatch* updates = BuildBatchGroup(&last_writer);
32     WriteBatchInternal::SetSequence(updates, last_sequence + 1);
33     last_sequence += WriteBatchInternal::Count(updates);
34 
35      {
36       mutex_.Unlock();BuildBatchGroup
37    // 这里讲组装好的batch内容写入log,并根据同步设置判断是否同步到磁盘
38       status = log_->AddRecord(WriteBatchInternal::Contents(updates));
39       bool sync_error = false;
40       if (status.ok() && options.sync) {
41         status = logfile_->Sync();
42         if (!status.ok()) {
43           sync_error = true;
44         }
45       }
46       if (status.ok()) {
47     // 写入内存表,这里采用了一个遍历WriteBatch完成插入的方式,稍后分析
48         status = WriteBatchInternal::InsertInto(updates, mem_);
49       }
50       mutex_.Lock();
51       if (sync_error) {
52 // 如果同步错误则记录相应错误信息.
53         RecordBackgroundError(status);
54       }
55     }
56 // 删除在BuildBatch里面设置的零时Batch的内容 
57 if (updates == tmp_batch_) tmp_batch_->Clear();
58 
59     versions_->SetLastSequence(last_sequence);
60   }
61 
62   while (true) {
63 // 唤醒所有等待写入的线程
64     Writer* ready = writers_.front();
65     writers_.pop_front();
66     if (ready != &w) {
67       ready->status = status;
68       ready->done = true;
69       ready->cv.Signal();
70     }
71     if (ready == last_writer) break;
72   }
73 
74   // Notify new head of write queue,因为可能请求时不在队首而进入了等待状态,
75 // 这样唤醒他使其成为新的队首写线程,进行MakeRoomForWrite等一系列操作
76   if (!writers_.empty()) {
77     writers_.front()->cv.Signal();
78   }
79 
80   return status;
81 }

所以从流程可以清晰的看到插入删除的流程主要为:

1. 将这条KV记录以顺序写的方式追加到log文件末尾;

2. 将这条KV记录插入内存中的Memtable中,在插入过程中如果刚好后台进程在compaction会短暂停顿以为后台进程compaction腾出时间及cpu

这里涉及到一次磁盘读写操作和内存SkipList的插入操作,但是这里的磁盘写时文件的顺序追加写入效率是很高的,所以并不会导致写入速度的降低;

而且从流程分析我们知道,在插入(删除)过程中如果多线程同时进行,那么这些操作将会将操作的同步设置相同的相邻的操作合并为一个批插入,这样可以使整个系统的总吞吐量更大。所以一次插入记录操作只会等待一次磁盘文件追加写和内存SkipList插入操作,这是为何leveldb写入速度如此高效的根本原因。

我们这里讲插入和删除以等同的方式进行了介绍,可能有的朋友会觉得奇怪,删除不是需要查找到插入的原始记录的么?而leveldb进行了一个巧妙的将随机读写,转换为顺序读写的方式,那就是其并不存在立即删除的操作,而是与插入操作相同将插入操作插入的是Key:Value值改为删除操作插入的是“Key:删除标记”,并不真正立即去删除记录,而是后台Compaction的时候才去做对应的真正的删除操作,这又极大的提高了leveldb的效率。

[leveldb]2.open/put/delete基本操作介绍

标签:原因   mtab   清理内存   exists   ack   res   class   方式   进程   

人气教程排行