配额的计算逻辑接下来我们重点分析下 flow_control_step 函数的处理逻辑 。
这个函数非常关键 , 它是整个流控模块的核心 。
它主要是用来计算 m_quota_size 和 m_quota_used 。
其中 , m_quota_size 决定了下个周期允许提交的事务数 , 即我们所说的配额 。
m_quota_used 用来统计下个周期已经提交的事务数 , 在该函数中会重置为 0 。
void Flow_control_module::flow_control_step( Pipeline_stats_member_collector *member) { // 这里的 seconds_to_skip 实际上就是 group_replication_flow_control_period , 后面会有定义 。 // 虽然 flow_control_step 是一秒调用一次 , 但实际起作用的还是 group_replication_flow_control_period 。 if (--seconds_to_skip > 0) return; // holds 即 m_holds_in_period int32 holds = m_holds_in_period.exchange(0); // get_flow_control_mode_var() 即 group_replication_flow_control_mode Flow_control_mode fcm = static_cast<Flow_control_mode>(get_flow_control_mode_var()); // get_flow_control_period_var() 即 group_replication_flow_control_period seconds_to_skip = get_flow_control_period_var(); // 计数器 m_stamp++; // 发送当前节点的状态信息 member->send_stats_member_message(fcm); switch (fcm) { case FCM_QUOTA: { // get_flow_control_hold_percent_var() 即 group_replication_flow_control_hold_percent , 默认是 10 // 所以 HOLD_FACTOR 默认是 0.9 double HOLD_FACTOR = 1.0 - static_cast<double>(get_flow_control_hold_percent_var()) / 100.0; // get_flow_control_release_percent_var() 即 group_replication_flow_control_release_percent , 默认是 50 // 所以 RELEASE_FACTOR 默认是 1.5 double RELEASE_FACTOR = 1.0 + static_cast<double>(get_flow_control_release_percent_var()) / 100.0; // get_flow_control_member_quota_percent_var() 即 group_replication_flow_control_member_quota_percent , 默认是 0 // 所以 TARGET_FACTOR 默认是 0 double TARGET_FACTOR = static_cast<double>(get_flow_control_member_quota_percent_var()) / 100.0; // get_flow_control_max_quota_var() 即 group_replication_flow_control_max_quota , 默认是 0 int64 max_quota = static_cast<int64>(get_flow_control_max_quota_var()); // 将上一个周期的 m_quota_size , m_quota_used 赋值给 quota_size , quota_used , 同时自身重置为 0 int64 quota_size = m_quota_size.exchange(0); int64 quota_used = m_quota_used.exchange(0); int64 extra_quota = (quota_size > 0 && quota_used > quota_size) ? quota_used - quota_size : 0; if (extra_quota > 0) { mysql_mutex_lock(&m_flow_control_lock); // 发送一个信号 , 释放 do_wait() 处等待的事务 mysql_cond_broadcast(&m_flow_control_cond); mysql_mutex_unlock(&m_flow_control_lock); } // m_holds_in_period 大于 0 , 则意味着需要进行流控 if (holds > 0) { uint num_writing_members = 0, num_non_recovering_members = 0; // MAXTPS 是 INT 的最大值 , 即 2147483647 int64 min_certifier_capacity = MAXTPS, min_applier_capacity = MAXTPS, safe_capacity = MAXTPS; m_flow_control_module_info_lock->rdlock(); Flow_control_module_info::iterator it = m_info.begin(); // 循环遍历所有节点的状态信息 while (it != m_info.end()) { // 这一段源码中没有 , 加到这里可以直观的看到触发流控时 , 每个节点的状态信息 。#ifndef NDEBUG it->second.debug(it->first.c_str(), quota_size, quota_used);#endif if (it->second.get_stamp() < (m_stamp - 10)) { // 如果节点的状态信息在最近 10 个周期内都没有更新 , 则清掉 m_info.erase(it++); } else { if (it->second.get_flow_control_mode() == FCM_QUOTA) { // 如果 group_replication_flow_control_certifier_threshold 大于 0 , // 且上一个周期进行认证的事务数大于 0 , // 且当前等待认证的事务数大于 group_replication_flow_control_certifier_threshold , // 且上一个周期进行认证的事务数小于 min_certifier_capacity // 则会将上一个周期进行认证的事务数赋予 min_certifier_capacity if (get_flow_control_certifier_threshold_var() > 0 && it->second.get_delta_transactions_certified() > 0 && it->second.get_transactions_waiting_certification() - get_flow_control_certifier_threshold_var() > 0 && min_certifier_capacity > it->second.get_delta_transactions_certified()) { min_certifier_capacity = it->second.get_delta_transactions_certified(); } if (it->second.get_delta_transactions_certified() > 0) // safe_capacity 取 safe_capacity 和 it->second.get_delta_transactions_certified() 中的较小值 safe_capacity = std::min(safe_capacity, it->second.get_delta_transactions_certified()); // 针对的是 applier , 逻辑同 certifier 一样 if (get_flow_control_applier_threshold_var() > 0 && it->second.get_delta_transactions_applied() > 0 && it->second.get_transactions_waiting_apply() - get_flow_control_applier_threshold_var() > 0) { if (min_applier_capacity > it->second.get_delta_transactions_applied()) min_applier_capacity = it->second.get_delta_transactions_applied(); if (it->second.get_delta_transactions_applied() > 0) // 如果上一个周期有事务应用 , 说明该节点不是 recovering 节点 num_non_recovering_members++; } if (it->second.get_delta_transactions_applied() > 0) // safe_capacity 取 safe_capacity 和 it->second.get_delta_transactions_applied() 中的较小值 safe_capacity = std::min( safe_capacity, it->second.get_delta_transactions_applied()); if (it->second.get_delta_transactions_local() > 0) // 如果上一个周期有本地事务 , 则意味着该节点存在写入 num_writing_members++; } ++it; } } m_flow_control_module_info_lock->unlock(); num_writing_members = num_writing_members > 0 ? num_writing_members : 1; // min_capacity 取 min_certifier_capacity 和 min_applier_capacity 的较小值 int64 min_capacity = (min_certifier_capacity > 0 && min_certifier_capacity < min_applier_capacity) ? min_certifier_capacity : min_applier_capacity; // lim_throttle 是最小配额 int64 lim_throttle = static_cast<int64>( 0.05 * std::min(get_flow_control_certifier_threshold_var(), get_flow_control_applier_threshold_var())); // get_flow_control_min_recovery_quota_var() 即 group_replication_flow_control_min_recovery_quota if (get_flow_control_min_recovery_quota_var() > 0 && num_non_recovering_members == 0) lim_throttle = get_flow_control_min_recovery_quota_var(); // get_flow_control_min_quota_var() 即 group_replication_flow_control_min_quota if (get_flow_control_min_quota_var() > 0) lim_throttle = get_flow_control_min_quota_var(); // min_capacity 不能太小 , 不能低于 lim_throttle min_capacity = std::max(std::min(min_capacity, safe_capacity), lim_throttle); // HOLD_FACTOR 默认是 0.9 quota_size = static_cast<int64>(min_capacity * HOLD_FACTOR); // max_quota 是由 group_replication_flow_control_max_quota 定义的 , 即 quota_size 不能超过 max_quota if (max_quota > 0) quota_size = std::min(quota_size, max_quota); // num_writing_members 是有实际写操作的节点数 if (num_writing_members > 1) { // 如果没有设置 group_replication_flow_control_member_quota_percent , 则按照节点数平分 quota_size if (get_flow_control_member_quota_percent_var() == 0) quota_size /= num_writing_members; else // 如果有设置 , 则当前节点的 quota_size 等于 quota_size * group_replication_flow_control_member_quota_percent / 100 quota_size = static_cast<int64>(static_cast<double>(quota_size) * TARGET_FACTOR); } // quota_size 还会减去上个周期超额使用的 quota quota_size = (quota_size - extra_quota > 1) ? quota_size - extra_quota : 1;#ifndef NDEBUG LogPluginErr(INFORMATION_LEVEL, ER_GRP_RPL_FLOW_CONTROL_STATS, quota_size, get_flow_control_period_var(), num_writing_members, num_non_recovering_members, min_capacity, lim_throttle);#endif } else { // 对应 m_holds_in_period = 0 的场景 , RELEASE_FACTOR 默认是 1.5 if (quota_size > 0 && get_flow_control_release_percent_var() > 0 && (quota_size * RELEASE_FACTOR) < MAXTPS) { // 当流控结束后 , quota_size = 上一个周期的 quota_size * 1.5 int64 quota_size_next = static_cast<int64>(quota_size * RELEASE_FACTOR); quota_size = quota_size_next > quota_size ? quota_size_next : quota_size + 1; } else quota_size = 0; } if (max_quota > 0) // quota_size 会取 quota_size 和 max_quota 中的较小值 quota_size = std::min(quota_size > 0 ? quota_size : max_quota, max_quota); // 最后 , 将 quota_size 赋值给 m_quota_size , m_quota_used 重置为 0 m_quota_size.store(quota_size); m_quota_used.store(0); break; } // 如果 group_replication_flow_control_mode 为 DISABLED , // 则会将 m_quota_size 和 m_quota_used 置为 0 , 这个时候会禁用流控 。 case FCM_DISABLED: m_quota_size.store(0); m_quota_used.store(0); break; default: assert(0); } if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_IN_RECOVERY) { applier_module->get_pipeline_stats_member_collector() ->compute_transactions_deltas_during_recovery(); }}
经验总结扩展阅读
- 2023年10月10日买猫吉日一览表 2023年10月10日适合买猫吗
- 2023年10月10日买狗黄道吉日 2023年10月10日是买狗的黄道吉日吗
- 2023年10月10日买鸡黄道吉日 2023年10月10日买鸡好吗
- 孙姓女孩名字2024年10月24日出生的生辰八字五行查询
- 2024年九月廿五出生刘姓女孩名字叫什么生辰八字五行查询
- 2023年2月10日适合买鸭吗 2023年2月10日买鸭好吗
- 2023年2月10日是买狗吉日吗 2023年2月10日买狗好不好
- 2023年2月10日买猫黄道吉日 2023年2月10日是买猫吉日吗
- 2023年2月10日买宠物行吗 2023年2月10日买宠物吉日一览表
- 2023年农历正月廿十砍树吉日 2023年2月10日砍树行吗