mysql 5.6 binlog组提交1

MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) { DBUG_ENTER("MYSQL_BIN_LOG::ordered_commit"); int flush_error= 0; my_off_t total_bytes= 0; bool do_rotate= false; /* These values are used while flushing a transaction, so clear everything. Notes: - It would be good if we could keep transaction coordinator log-specific data out of the THD structure, but that is not the case right now. - Everything in the transaction structure is reset when calling ha_commit_low since that calls st_transaction::cleanup. */ thd->transaction.flags.pending= true; thd->commit_error= THD::CE_NONE; thd->next_to_commit= NULL; thd->durability_property= HA_IGNORE_DURABILITY; thd->transaction.flags.real_commit= all; thd->transaction.flags.xid_written= false; thd->transaction.flags.commit_low= !skip_commit; thd->transaction.flags.run_hooks= !skip_commit; #ifndef DBUG_OFF /* The group commit Leader may have to wait for follower whose transaction is not ready to be preempted. Initially the status is pessimistic. Preemption guarding logics is necessary only when DBUG_ON is set. It won‘t be required for the dbug-off case as long as the follower won‘t execute any thread-specific write access code in this method, which is the case as of current. */ thd->transaction.flags.ready_preempt= 0; #endif DBUG_PRINT("enter", ("flags.pending: %s, commit_error: %d, thread_id: %lu", YESNO(thd->transaction.flags.pending), thd->commit_error, thd->thread_id)); /* Stage #1: flushing transactions to binary log While flushing, we allow new threads to enter and will process them in due time. Once the queue was empty, we cannot reap anything more since it is possible that a thread entered and appointed itself leader for the flush phase. */ if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log)) { DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", thd->thread_id, thd->commit_error)); DBUG_RETURN(finish_commit(thd)); } THD *wait_queue= NULL; flush_error= process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue); my_off_t flush_end_pos= 0; if (flush_error == 0 && total_bytes > 0) flush_error= flush_cache_to_file(&flush_end_pos); /* If the flush finished successfully, we can call the after_flush hook. Being invoked here, we have the guarantee that the hook is executed before the before/after_send_hooks on the dump thread preventing race conditions among these plug-ins. */ if (flush_error == 0) { const char *file_name_ptr= log_file_name + dirname_length(log_file_name); DBUG_ASSERT(flush_end_pos != 0); if (RUN_HOOK(binlog_storage, after_flush, (thd, file_name_ptr, flush_end_pos))) { sql_print_error("Failed to run ‘after_flush‘ hooks"); flush_error= ER_ERROR_ON_WRITE; } signal_update(); DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); } /* Stage #2: Syncing binary log file to disk */ bool need_LOCK_log= (get_sync_period() == 1); /* LOCK_log is not released when sync_binlog is 1. It guarantees that the events are not be replicated by dump threads before they are synced to disk. */ if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, need_LOCK_log ? NULL : &LOCK_log, &LOCK_sync)) { DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", thd->thread_id, thd->commit_error)); DBUG_RETURN(finish_commit(thd)); } THD *final_queue= stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE); if (flush_error == 0 && total_bytes > 0) { DEBUG_SYNC(thd, "before_sync_binlog_file"); std::pair<bool, bool> result= sync_binlog_file(false); flush_error= result.first; } if (need_LOCK_log) mysql_mutex_unlock(&LOCK_log); /* Stage #3: Commit all transactions in order. This stage is skipped if we do not need to order the commits and each thread have to execute the handlerton commit instead. Howver, since we are keeping the lock from the previous stage, we need to unlock it if we skip the stage. */ if (opt_binlog_order_commits) { if (change_stage(thd, Stage_manager::COMMIT_STAGE, final_queue, &LOCK_sync, &LOCK_commit)) { DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", thd->thread_id, thd->commit_error)); DBUG_RETURN(finish_commit(thd)); } THD *commit_queue= stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE); DBUG_EXECUTE_IF("semi_sync_3-way_deadlock", DEBUG_SYNC(thd, "before_process_commit_stage_queue");); process_commit_stage_queue(thd, commit_queue); mysql_mutex_unlock(&LOCK_commit); /* Process after_commit after LOCK_commit is released for avoiding 3-way deadlock among user thread, rotate thread and dump thread. */ process_after_commit_stage_queue(thd, commit_queue); final_queue= commit_queue; } else mysql_mutex_unlock(&LOCK_sync); /* Commit done so signal all waiting threads */ stage_manager.signal_done(final_queue); /* Finish the commit before executing a rotate, or run the risk of a deadlock. We don‘t need the return value here since it is in thd->commit_error, which is returned below. */ (void) finish_commit(thd); /* If we need to rotate, we do it without commit error. Otherwise the thd->commit_error will be possibly reset. */ if (do_rotate && thd->commit_error == THD::CE_NONE) { /* Do not force the rotate as several consecutive groups may request unnecessary rotations. NOTE: Run purge_logs wo/ holding LOCK_log because it does not need the mutex. Otherwise causes various deadlocks. */ DEBUG_SYNC(thd, "ready_to_do_rotation"); bool check_purge= false; mysql_mutex_lock(&LOCK_log); int error= rotate(false, &check_purge); mysql_mutex_unlock(&LOCK_log); if (error) thd->commit_error= THD::CE_COMMIT_ERROR; else if (check_purge) purge(); } DBUG_RETURN(thd->commit_error); }



###flush stage


MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
                                         bool *rotate_var,
                                         THD **out_queue_var)
  DBUG_ASSERT(total_bytes_var && rotate_var && out_queue_var);
  my_off_t total_bytes= 0;
  int flush_error= 1;

  const ulonglong max_udelay= my_atomic_load32(&opt_binlog_max_flush_queue_time);
  const ulonglong start_utime= max_udelay > 0 ? my_micro_time() : 0;

    First we read the queue until it either is empty or the difference
    between the time we started and the current time is too large.

    We remember the first thread we unqueued, because this will be the
    beginning of the out queue.
  bool has_more= true;
  THD *first_seen= NULL;
  while ((max_udelay == 0 || my_micro_time() < start_utime + max_udelay) && has_more)
    std::pair<bool,THD*> current= stage_manager.pop_front(Stage_manager::FLUSH_STAGE);
    std::pair<int,my_off_t> result= flush_thread_caches(current.second);
    has_more= current.first;
    total_bytes+= result.second;
    if (flush_error == 1)
      flush_error= result.first;
    if (first_seen == NULL)
      first_seen= current.second;

    Either the queue is empty, or we ran out of time. If we ran out of
    time, we have to fetch the entire queue (and flush it) since
    otherwise the next batch will not have a leader.
  if (has_more)
    THD *queue= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE);
    for (THD *head= queue ; head ; head = head->next_to_commit)
      std::pair<int,my_off_t> result= flush_thread_caches(head);
      total_bytes+= result.second;
      if (flush_error == 1)
        flush_error= result.first;
    if (first_seen == NULL)
      first_seen= queue;

  *out_queue_var= first_seen;
  *total_bytes_var= total_bytes;
  if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t) max_size)
    *rotate_var= true;
  return flush_error;


change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log)

|–>stage_manager.enroll_for(stage, queue, leave_mutex) //将当前线程加入到m_queue[FLUSH_STAGE]中,如果是队列的第一个线程,就被设置为leader,否则就是follower线程,线程会这其中睡眠,直到被leader唤醒(m_cond_done)



flush_error= process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue); //只有leader线程才会进入这个逻辑



|–>判断总的写入binlog的byte数是否超过max bin log size,如果超过了,就设置rotate标记


flush_error= flush_cache_to_file(&flush_end_pos);

|–>将I/O Cache中的内容写到文件中


signal_update()  //通知dump线程有新的Binlog


###sync stage


change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync)

|–>stage_manager.enroll_for(stage, queue, leave_mutex)  //当前线程加入到m_queue[SYNC_STAGE]队列中,释放lock_log锁;同样的如果是SYNC_STAGE队列的leader,则立刻返回,否则进行condition wait.



final_queue= stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE);  //从SYNC_STAGE队列中取出来,并清空队列,主要用于commit阶段


std::pair<bool, bool> result= sync_binlog_file(false);  //刷binlog 文件(如果设置了sync_binlog的话)


简单的理解就是,在flush stage阶段形成N批的组session,在SYNC阶段又会由这N批组产生出新的leader来负责做最耗时的sync操作


###commit stage



当binlog_order_commits关闭时,直接unlock LOCK_sync,由各个session自行进入Innodb commit阶段(随后调用的finish_commit(thd)),这样不会保证binlog和事务commit的顺序一致,如果你不关注innodb的ibdata中记录的binlog信息,那么可以关闭这个选项来稍微提高点性能


当打开binlog_order_commits时,才会进入commit stage,如下描述的


change_stage(thd, Stage_manager::COMMIT_STAGE,final_queue, &LOCK_sync, &LOCK_commit)



THD *commit_queue= stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE);  //取出并清空COMMIT_STAGE队列


process_commit_stage_queue(thd, commit_queue, flush_error)






|–>将所有Pending的线程的标记置为false(thd->transaction.flags.pending= false)并做m_cond_done广播,唤醒pending的线程


(void) finish_commit(the);  //如果binlog_order_commits设置为FALSE,就会进入这一步来提交存储引擎层事务; 另外还会更新grid信息

Innodb的group commit和mariadb的类似,都只有两次sync,即在prepare阶段sync,以及sync Binlog文件(双一配置),为了保证rotate时,所有前一个binlog的事件的redo log都被刷到磁盘,会在函数new_file_impl中调用如下代码段:
if (DBUG_EVALUATE_IF(“expire_logs_always”, 0, 1)
&& (error= ha_flush_logs(NULL)))
goto end;

ha_flush_logs 会调用存储引擎接口刷日志文件





