本文共 8361 字,大约阅读时间需要 27 分钟。
目录问题
关于半同步复制
代码层面解释上面几个问题
1 问题
对于MySQL半同步复制,了解MySQL的人,都肯定能说出一二,比如和异步复制、同步复制的差别、超时退化为异步复制和无损复制等,但是这面有以下几个问题(基于MySQL版本为社区版5.7.21):主库在什么时候调用半同步插件?
主库以什么为单位发送binlog日志?
主库会对二进制日志做额外处理吗?
从库以什么为单位来处理发送过来的日志信息
从库什么时候发送ACK给到主库?
当主库设置了多个ACK后,主库怎么判断收到的ACK的个数,即确认的位置点?
对于上面的几个问题,这面进行简单的回答,所以如果不想看其他部分的,直接看完这一部分就可以了。主库在什么时候调用半同步插件?
MySQL在server层引入了组提交后,为了提高并行度,将提交阶段分为了flush、sync和commit阶段,根据sync_binlog设置的不同,会在flush阶段(sync_binlog设置为非1)或sync阶段(sync_binlog设置为1)以更新binlog位置点的方式通知dump线程发送binlog,而在commit阶段等待从库的ACK应答情况,这面又分为两个不同的等待位置点,即rpl_semi_sync_master_wait_point参数是after_sync还是after_commit。
主库以什么为单位发送binlog日志?
主库会以event为单位将binlog日志写入到net_buffer里面,但是并不会立即发送,而是达到条件后一起发送给从库。
主库会对二进制日志做额外处理吗?
答案是肯定的,主库读取每个event后,在发送之前都会在event头部添加两个字节的内容,第一个字节是固定的0xef,第二个字节表示是否需要从库收到后发送ACK回包信息,所以对于从库的ACK的回包是以组提交里面一组事务为单位(之前有同事问我ACK确认包以什么为单位,我这面回答是event为单位,羞愧)。
从库以什么为单位来处理发送过来的日志信息?
从库接收到主库发送过来的binlog日志后,也是通过event为单位进行读取、根据sync_relay_log设置写入到ralaylog文件里面。
从库什么时候发送ACK给到主库?
问题3里面已经简单介绍了主库对event的处理,所以从库在读取发送过来的event后,根据前两个字节判断是否需要发送ACK包。
当主库设置了多个ACK后,主库怎么判断收到的ACK的个数?
主库这面维护了一个ACK数组,数组的大小设置为rpl_semi_sync_master_wait_for_slave_count值N-1,当rpl_semi_sync_master_wait_for_slave_count为1时,用不到该数组。数组里面记录了以N-1个以从库的server_id为标识,确认的位置点,当收到一个新的ACK包时,通过server_id进行判断,如果不在数组里面,并且数组已经满了,说明收到了N个确认包,找到最小的确认位置点,等待在这个位置点或者之前的所有连接均可以返回成功。
2 关于半同步复制
下面简单介绍一下半同步复制,MySQL的半同步复制是在MGR出来之前的平衡性能和数据一致性的最好的解决方案,尤其在组提交、无损复制和并行复制出来之后。对于并行复制,writeset的判断方式的出现更是解决了主库并发不高的情况下,从库的并行回放的问题(MySQL在5.7.22和8.0.1里面添加了binlog_transaction_dependency_tracking参数用来标记主库通过什么方式来判断事务是否能在从库并行回放,同事测试发现对于设置成WRITESET会比COMMIT_ORDER TPS损耗10%左右,因为这面如果设置成WRITESET,系统会在原来的基础上多加一次通过hash表last_committed值的计算。而当binlog_transaction_dependency_history_size的值,即hash表的个数设置为默认25000和10时,测试TPS发现,设置为10的TPS却比25000的更高,查看代码发现hash表的实现时通过std::map实现,而std::map是通过红黑树实现,这面会需要对比更多的次数,导致性能差一些,后面估计会改成hash_map来实现,这面不具体说明)。MySQL的半同步插件、GTID等这些特性,使得实现一套高可用的方案更加简单,高可用软件只需要解决主节点可用性的检测、通过GTID的对比选择提升哪个从库为新主和最后一个事务的处理等问题即可。当然实现的过程中,需要注意很多细节,比如半同步插件rpl_semi_sync_master_wait_for_slave_count、rpl_semi_sync_master_timeout值的设置。对于数据一致性要求比较高的业务场景,都会避免半同步因为从库长时间没有回应ACK包退化为异步复制,所以将rpl_semi_sync_master_timeout设置为一个比较大的数字,使得半同步不退化,而对于rpl_semi_sync_master_wait_for_slave_count参数,则需要应场景设置,表示需要多少个从库收到事务的日志主库才返回给客户端成功。虽然半同步插件已经解决了一致性和性能大部分问题,但是相比较于MySQL的复制的最新的解决方案MGR,还是问题比较多,比如多写的问题,可以说MGR是复制的最终解决方案(现在MGR还是不够成熟,之前使用的时候就遇到过因为网络问题,导致某个节点上面的GTID对应的事务和别的节点上面的不同的问题,最终导致数据不一致的问题)。对于开源的高可用解决方案里面借助半同步来做高可用的也是很多,比如青云的分布式数据库RadonDB的下面的存储节点,GitHub上面介绍的图如下:
这面Raft算法是借助GTID来进行选主,而不是为了数据和同步日志,日志的同步这面还是用了半同步复制,将rpl_semi_sync_master_wait_for_slave_count设置为1,正常写入后,肯定会有一个从库收到了事务的日志信息,并且将rpl_semi_sync_master_timeout设置为999999,可以认为无限大,不会发生退化为异步的问题。
3 代码层面解释上面几个问题
下面根据MySQL代码解释上面的几个问题,再次强调参考的代码为MySQL社区版5.7.21。
我们先查看主库的处理逻辑,首先查看sql/http://binlog.cc里面的提交的逻辑,下面贴一下主要代码:
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
{
# flush 阶段 将binlog刷新到buff里面,并且刷新到文件缓存
// 将flush阶段的链表里面所有的事物日志写入到binlog的buff里面,这面也会生成gtid_event,所以解析binlog会发现gtid_event时间是在后面query_event后面 flush_error= process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);
// 将binlog刷新到文件缓存里面 flush_error= flush_cache_to_file(&flush_end_pos);
// 判断sync_binlog是否为1 update_binlog_end_pos_after_sync= (get_sync_period() == 1);
// 观察者模式,调用Binlog_storage_observer里面的repl_semi_report_binlog_update函数,将文件名和pos点记录下来 // file_name_ptr 当前写入的binlog文件 // flush_end_pos 组提交flush链表里面所有binlog最后的pos点 RUN_HOOK(binlog_storage, after_flush,(thd, file_name_ptr, flush_end_pos)));
// update_binlog_end_pos_after_sync不为1,这面就更新binlog文件的最后位置点,唤醒dump线程发送binlog if (!update_binlog_end_pos_after_sync)
update_binlog_end_pos();
# sync阶段 将binlog内容刷新到磁盘 // 如果sync_binlog为1 才会走到下面 if (update_binlog_end_pos_after_sync)
{
// 找到sync链表里面最后一个事务 while (tmp_thd->next_to_commit != NULL)
tmp_thd= tmp_thd->next_to_commit;
// 更新binlog文件的最后位置点,唤醒dump线程发送binlog update_binlog_end_pos(tmp_thd->get_trans_pos());
}
# commit阶段 提交所有的事务 // 如果rpl_semi_sync_master_wait_point的值设置为after_sync,在这面等待从库的ACK sync_error= call_after_sync_hook(commit_queue);
// 遍历commit链表里面的事务进行提交 process_commit_stage_queue(thd, commit_queue);
// 如果rpl_semi_sync_master_wait_point的值设置为after_commit,在这面等待从库的ACK process_after_commit_stage_queue(thd, commit_queue);
}
可以发现上面对于sync_binlog不同的值,通知dump线程发送binlog的位置点也是不一样的:sync_binlog设置为1: 在sync阶段通知dump 线程发送binlog,这时候binlog确定已经刷盘,因为在flush阶段只是将binlog刷新到了文件缓存,如果发生了宕机,有可能丢失,这个时候如果从库已经收到了,导致从库比主库数据多。
sync_binlog设置为非1: 在flush阶段通知dump线程发送binlog。
对于上面说的咋flush阶段调用的Binlog_storage_observer里面的repl_semi_report_binlog_update函数,里面的调用如下:
--repl_semi_report_binlog_update(Binlog_storage_param *param, const char *log_file, my_off_t log_pos)
----repl_semisync.writeTranxInBinlog(log_file, log_pos);
------active_tranxs_->insert_tranx_node // 将位点信息保存到active_tranxs里面
对于dump线程发送binlog日志的逻辑,调用的逻辑如下:
--Binlog_sender::run()
----send_binlog(&log_cache, start_pos)
------get_binlog_end_pos(log_cache) // 获取binlog最后的位置,如果没有更新会一直卡在这面------send_events(log_cache, end_pos) // 发送events--------read_event(log_cache, m_event_checksum_alg,&event_ptr, &event_len)) // 以event为单位读取--------before_send_hook(log_file, log_pos)
----------RUN_HOOK(binlog_transmit, before_send_event,(m_thd, m_flag, &m_packet, log_file, log_pos))) // 调用binlog_transmit观察者的repl_semi_before_send_event函数------------repl_semisync.updateSyncHeader 为每个event添加header
--------send_packet() // 写入到net_buff里面
下面重点说明一下repl_semisync.updateSyncHeader函数,部分代码如下:
int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
const char *log_file_name,
my_off_t log_file_pos,
uint32 server_id)
{
// 判断读到的log_file_name和log_file_pos是否需要设置发送ACK sync = active_tranxs_->is_tranx_end_pos(log_file_name,log_file_pos);
if (sync)
{
// 设置Heaer (packet)[2] = kPacketFlagSync;
}
}
bool ActiveTranx::is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos)
{
// 计算出hash值 unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
// 获取hash表里面的hash_val桶对应的链表头部,因为有可能多个log_file_name和log_file_pos计算出的hash值是一样的,trx_htb_里面存储的都是上面active_tranxs_->insert_tranx_node加入的位点信息 TranxNode *entry = trx_htb_[hash_val];
while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;
entry = entry->hash_next_;
}
// 如果找到的话说明需要设置头部的ACK应答信息 return (entry != NULL);
}
对于从库的接收发送过来的二进制文件,以及处理逻辑,见http://rpl_slave.cc里面的handle_slave_io函数,里面的调用的逻辑为:
--read_event(mysql, mi, &suppress_warnings) // 读取event--RUN_HOOK(binlog_relay_io, after_read_event,(thd, mi,(const char*)mysql->net.read_pos + 1,
event_len, &event_buf, &event_len))) // 调用binlog_relay_io观察者里面的repl_semi_slave_read_event函数--queue_event(mi, event_buf, event_len) // 将读取到的event保存到relaylog--RUN_HOOK(binlog_relay_io, after_queue_event, (thd, mi, event_buf, event_len, synced)) // 调用binlog_relay_io观察者里面repl_semi_slave_queue_event函数
在repl_semi_slave_read_event函数里面,会首先读取event的前两个字节,用来判断是否发送ACK
if ((unsigned char)(header[0]) == kPacketMagicNum) // kPacketMagicNum 赋值为0xef{
*need_reply = (header[1] & kPacketFlagSync); // kPacketFlagSync 赋值为0x01 *payload_len = total_len - 2;
*payload = header + 2;
}
在repl_semi_slave_queue_event函数里面,会选判断是否需要发送ACK,如下:
if (rpl_semi_sync_slave_status && semi_sync_need_reply)
{
// 发送ACK包,并且带上已经读取和写入到relaylog里面的文件名和位置点 (void) repl_semisync.slaveReply(param->mysql,
param->master_log_name,
param->master_log_pos);
}
对于主库的ACK线程的处理逻辑如下:
--Ack_receiver::run()
----repl_semisync.reportReplyPacket(m_slaves[i].server_id(), net.read_pos, len) // 收到ACK包------handleAck(server_id, log_file_name, log_file_pos)
--queue_event(mi, event_buf, event_len) // 将读取到的event保存到relaylog--RUN_HOOK(binlog_relay_io, after_queue_event, (thd, mi, event_buf, event_len, synced)) // 调用binlog_relay_io观察者里面repl_semi_slave_queue_event函数
对于handleAck函数,里面的处理逻辑
// 如果rpl_semi_sync_master_wait_for_slave_count设置为1,不需要别的处理逻辑if (rpl_semi_sync_master_wait_for_slave_count == 1)
reportReplyBinlog(log_file_name, log_file_pos);
else
{
// 如果rpl_semi_sync_master_wait_for_slave_count>1 const AckInfo *ackinfo= NULL;
// 将获取到的从库的ACK信息,插入到ack_container_数组里面,这面是以server_id为唯一值,如果数组里面有server_id只需更新里面的文件名和位置点,如果没有且数组没有满的话插入进去,如果满了说明收到了rpl_semi_sync_master_wait_for_slave_count个数的ACK,所以获取数组里面最小的文件和位置点返回 ackinfo= ack_container_.insert(server_id, log_file_name, log_file_pos);
// 如果ackinfo不为空,说明ack_container_已经满了,说明主库已经收到足够的ACK信息,可以调用reportReplyBinlog if (ackinfo != NULL)
reportReplyBinlog(ackinfo->binlog_name, ackinfo->binlog_pos);
}
对于上面一直通过文件名和位置点进行对比,会不会有问题呢?答案是否定的,因为binlog的文件名是递增的,如果没有执行reset master命令,这个文件名会一直增加,当然到了超过了最大值之后(2^31-1),会报错,需要重启数据库。
转载地址:http://wrqsx.baihongyu.com/