时间:2021-07-01 10:21:17 帮助过:96人阅读
librbd: integrate journaling support for IO operations #6541 (https://github.com/ceph/ceph/pull/6541) ()
librbd: integrate journaling for maintenance operations #6625 (https://github.com/ceph/ceph/pull/6625)(对于维护性操作例如resize, snapshots等放到journal中)
https://github.com/ceph/ceph/pull/6625/commits/cd3d056cb36235e9766c8822ab2168affedd56ef
#rbd feature enable Foo/Bar journaling
下发enable journaling的请求。
if (enabled) {
operation::EnableFeaturesRequest<I> *req =
new operation::EnableFeaturesRequest<I>(
m_image_ctx, on_finish, journal_op_tid, features);
req->send();
关注EnableFeaturesRequest这个类。
这个类里面清楚的说明了流程:
* @verbatim
*
* <start>
* |
* v
* STATE_PREPARE_LOCK (send_prepare_lock()给image加锁)《1》
* |
* v
* STATE_BLOCK_WRITES () 《2》阻塞写入。
* |
* v
* STATE_GET_MIRROR_MODE (获取image mirror mode,send_get_mirror_mode(),主要是对于pool的看是pool还是image。)在handle_get_mirror_mode中做一些必要的检测:exclusive-lock,object-map,fast-diff这些feature都要提前打开。《3》
* |
* v
* STATE_CREATE_JOURNAL (skip if not 《4》
* | required)
* v
* STATE_APPEND_OP_EVENT (skip if journaling 《5》
* | disabled)
* v
* STATE_UPDATE_FLAGS
* |
* v
* STATE_SET_FEATURES
* |
* v
* STATE_CREATE_OBJECT_MAP (skip if not
* | required)
* v
* STATE_ENABLE_MIRROR_IMAGE
* |
* V
* STATE_NOTIFY_UPDATE
* |
* | (unblock writes)
* v
* <finish>
* @endverbatim
《4》:journal/CreateRequest.cc
void CreateRequest<I>::create_journal() {
ldout(m_cct, 20) << this << " " << __func__ << dendl;
ImageCtx::get_timer_instance(m_cct, &m_timer, &m_timer_lock);
m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock,
m_ioctx, m_image_id, m_image_client_id, {});
using klass = CreateRequest<I>;
Context *ctx = create_context_callback<klass, &klass::handle_create_journal>(this);
m_journaler->create(m_order, m_splay_width, m_pool_id, ctx);
}
在pool中创建journal metadata对象。
分配tag,tag是一个类似于同步点的东西,每当客户端打开了image,就会分配一个tag,使用tag是为了保证在promote之前不能用于写数据。
使用上一步分配的tag,注册一个client。
接下来就是结束后的清理工作。
总结:feature enable操作完成后,在image所在的pool里面,会多出一个journal.<image_id> 的对象,这个对象用于保存journal metadata。
对于启动了journal feature的image,如果有write操作,相比于没有journal feature会有如下差别:
主要是看void AbstractImageWriteRequest::send_request()这个函数,这个函数会检测image是否开启了journal。
void AbstractImageWriteRequest<I>::send_request(){
bool journaling = false;
journaling = (image_ctx.journal != nullptr &&
image_ctx.journal->is_journal_appending());
send_object_requests(object_extents, snapc,
(journaling ? &requests : nullptr)); //这里如果开启了journaling,那么request会被加入到队列暂存。否则就会被发送出去。
if (journaling) {
// in-flight ops are flushed prior to closing the journal
assert(image_ctx.journal != NULL);
journal_tid = append_journal_event(requests, m_synchronous);
}
}
具体实现关注:append_journal_event()函数。
append_journal_event
->append_write_event
->append_io_events
最终调用到Journal<I>::append_io_events
中,从该函数我们可以看到:
1.每次写是以事件的形式来被记录的
2.每次写会以Future的形式来进入到journal中
3.在Journal落盘的时候,通过调用C_IOEventSafe来完成真正的数据落盘
不仅仅是写,只要有更新io产生就会有Journal数据产生,具体的事件全集可以参见:
C_IOEventSafe的回调函数,void Journal::handle_io_event_safe(int r, uint64_t tid)会继续讲原始request发送出去,完成原始request的落盘。
在rbd-mirror中:
m_replay_handler = new ReplayHandler<I>(this);
m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
342 void Journaler::start_live_replay(ReplayHandler *replay_handler,
343 double interval) {
344 create_player(replay_handler);
345 m_player->prefetch_and_watch(interval);
346 }
void JournalPlayer::fetch(uint64_t object_num) {
624 ceph_assert(ceph_mutex_is_locked(m_lock));
625
626 auto object_player = ceph::make_ref<ObjectPlayer>(
627 m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
628 m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
629 m_max_fetch_bytes);
630
631 auto splay_width = m_journal_metadata->get_splay_width();
632 m_object_players[object_num % splay_width] = object_player;
633 fetch(object_player);
634 }
void JournalPlayer::fetch(const ceph::ref_t<ObjectPlayer> &object_player) {
637 ceph_assert(ceph_mutex_is_locked(m_lock));
638
639 uint64_t object_num = object_player->get_object_number();
640 std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
641 ceph_assert(m_fetch_object_numbers.count(object_num) == 0);
642 m_fetch_object_numbers.insert(object_num);
643
644 ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
645 C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
646
647 object_player->fetch(fetch_ctx);
648 }
void ObjectPlayer::fetch(Context *on_finish) {
45 ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
46
47 if(m_bps_throttle != nullptr) {
48 if(m_bps_throttle->get<ObjectPlayer, Context,
49 &ObjectPlayer::handle_bps_throttle_ready>(m_max_fetch_bytes, this, on_finish)) {
50 return;
51 }
52 }
53
54 handle_bps_throttle_ready(0, on_finish);
55 }
56
57 void ObjectPlayer::handle_bps_throttle_ready(int r, Context *on_finish) {
58 ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
59
60 Mutex::Locker locker(m_lock);
61 assert(!m_fetch_in_progress);
62 m_fetch_in_progress = true;
63
64 C_Fetch *context = new C_Fetch(this, on_finish);
65 librados::ObjectReadOperation op;
66 op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
67 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
68
69 librados::AioCompletion *rados_completion =
70 librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
71 NULL);
72 r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
73 assert(r == 0);
74 rados_completion->release();
75 }
ObjectPlyer::fetch读取object,然后再回调C_Fetch的finish中的handle_fetched()函数。如下:
void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
631 ldout(m_cct, 10) << __func__ << ": "
632 << utils::get_object_name(m_object_oid_prefix, object_num)
633 << ": r=" << r << dendl;
634
635 Mutex::Locker locker(m_lock);
636 assert(m_fetch_object_numbers.count(object_num) == 1);
637 m_fetch_object_numbers.erase(object_num);
638
639 if (m_shut_down) {
640 return;
641 }
642
643 if (r == 0) {
644 ObjectPlayerPtr object_player = get_object_player(object_num);
645 remove_empty_object_player(object_player);
646 }
647 process_state(object_num, r);
648 }
这里的调用链如下:
process_prefetch
->notify_entries_available
->replay_handler->handle_entries_available();
->journal->handle_replay_ready();
->m_journal_replay->process(event_entry, on_ready, on_commit);//void Replay<I>::process(const EventEntry &event_entry)
到这里会调用EventVisitor中完成的这里,这里根据event的类型将通过replay->handle_event来处理。
即就是:
boost::apply_visitor(EventVisitor(this, on_ready, on_safe),event_entry.event);
这里会根据event的类型选择不同的handle_event函数。
例如对于write event:
template <typename I>
void Replay<I>::handle_event(const journal::AioWriteEvent &event,
Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << ": AIO write event" << dendl;
bufferlist data = event.data;
bool flush_required;
auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
io::AIO_TYPE_WRITE,
&flush_required,
{});
if (aio_comp == nullptr) {
return;
}
io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
{{event.offset, event.length}},
std::move(data), 0, {});
if (flush_required) {
m_lock.Lock();
auto flush_comp = create_aio_flush_completion(nullptr);
m_lock.Unlock();
if (flush_comp != nullptr) {
io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
}
}
}
回放结束。
(参考了别人的说法,具体没有去看)
在看代码的时候,一直有一个疑惑,本地写rbd的时候,有一个Jounaler。rbd mirror进行同步的时候也有一个Journaler。那么Journal数据是怎么删除的?是在本地的Journaler删除的吗?还是在rbd mirror中删除的?一直困惑不已。
后来仔细看了一下JournalTrimmer中的代码,才发现原来这里面还用到了Listener。就是在最开时候的时候,会注册多个client到journal,当journal的元数据发生变化的话会notify给所有的client,这样各个client就能够得知metadata变化了,在调用相应的更新函数,具体可以参见JournalMetadata::handle_refresh_complete中的这一小段核心逻辑:
RDB Journal特性研究
标签:not features unlock 使用 nts d3d RoCE val release