从源码分析 MGR 的流控机制( 二 )

首先判断节点的状态信息是否在 m_info 中存在 。如果不存在 , 则插入 。
接着通过 update_member_stats 更新节点的统计信息 。
更新后的统计信息包括以下两部分:

  • 当前数据:如 m_transactions_waiting_certification(当前等待认证的事务数) , m_transactions_waiting_apply(当前等待应用的事务数) 。
  • 上一周期的增量数据:如 m_delta_transactions_certified(上一周期进行认证的事务数) 。
    m_delta_transactions_certified 等于 m_transactions_certified (这一次的采集数据) - previous_transactions_certified (上一次的采集数据)
最后会通过is_flow_control_needed判断是否需要流控 。如果需要流控 , 则会将 m_holds_in_period 自增加 1 。
如果是 Debug 版本 , 且将 log_error_verbosity 设置为 3 。当需要流控时 , 会在错误日志中打印以下信息 。
[Note] [MY-011726] [Repl] Plugin group_replication reported: 'Flow control - update member stats: 127.0.0.1:33071 stats certifier_queue 0, applier_queue 20 certified 387797 (308), applied 387786 (289), local 0 (0), quota 400 (274) mode=1'什么时候会触发流控呢?
接下来我们看看 is_flow_control_needed 函数的处理逻辑 。
bool Pipeline_member_stats::is_flow_control_needed() {  return (m_flow_control_mode == FCM_QUOTA) &&         (m_transactions_waiting_certification >              get_flow_control_certifier_threshold_var() ||          m_transactions_waiting_apply >              get_flow_control_applier_threshold_var());}由此来看 , 触发流控需满足以下条件:
  1. group_replication_flow_control_mode 设置为 QUOTA 。
  2. 当前等待认证的事务数大于 group_replication_flow_control_certifier_threshold 。
    当前等待认证的事务数可通过 performance_schema.replication_group_member_stats 中的 COUNT_TRANSACTIONS_IN_QUEUE 查看 。
  3. 当前等待应用的事务数大于 group_replication_flow_control_applier_threshold 。
    当前等待应用的事务数可通过 performance_schema.replication_group_member_stats 中的 COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE 查看 。
除了条件 1 , 条件 2 , 3 满足其一即可 。
当需要流控时 , 会将 m_holds_in_period 自增加 1 。
m_holds_in_period 这个变量会用在 Flow_control_module::flow_control_step 中 。
而 Flow_control_module::flow_control_step 是在 Certifier_broadcast_thread::dispatcher() 中调用的 , 每秒执行一次 。
void Certifier_broadcast_thread::dispatcher() {  ...  while (!aborted) {    ...    applier_module->run_flow_control_step();    ...    struct timespec abstime;    // 定义超时时长 1s 。    set_timespec(&abstime, 1);    mysql_cond_timedwait(&broadcast_dispatcher_cond, &broadcast_dispatcher_lock,                         &abstime);    mysql_mutex_unlock(&broadcast_dispatcher_lock);    broadcast_counter++;  }}void run_flow_control_step() override {  flow_control_module.flow_control_step(&pipeline_stats_member_collector);}

经验总结扩展阅读