RocketMQ源码系列(8) — 消息存储之主从复制
这篇文章我们来看下 RocketMQ 的高可用机制之一的主从复制模式,所谓的主从复制就是一主多从模式,生产者向 Master 节点写入数据,然后将数据同步给 Slave 节点做数据备份,消费者可以从 master 或 slave 节点消费数据。RocketMQ 默认配置下是开启主从复制,核心组件为 HAService
,功能就是将 master 数据复制到 slave 节点。但其不支持主从自动切换,这种模式下,master 宕机后,集群将不能继续写入消息,但可以从 slave 节点消费消息。
HA 主从复制
HA Master 主节点
1、HAService 核心组件
在 DefaultMessageStore 的构造方法中,会根据是否启用DLedger
技术来判断使用哪种类型的高可用机制,DLeger 技术默认是关闭的,也就是默认启用 HAService 高可用机制。
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
HAService 组件:
List<HAConnection>
:主从建立的网络连接,一个 master 可能有多个 slave,每一个 HAConnection 就代表一个 slave 节点AcceptSocketService
:接收slave节点的 Socket 服务GroupTransferService
:组传输的组件HAClient
:主从同步客户端组件WaitNotifyObject
:线程阻塞,等待通知组件connectionCount
:连接到 master 的slave数量push2SlaveMaxOffset
:推送到slave的最大偏移量
在 DefaultMessageStore 的 start()
方法中,会调用 HAService 的 start() 方法来启动这些组件。
public class HAService {
private final AtomicInteger connectionCount = new AtomicInteger(0);
private final List<HAConnection> connectionList = new LinkedList<>();
private final AcceptSocketService acceptSocketService;
private final DefaultMessageStore defaultMessageStore;
private final GroupTransferService groupTransferService;
private final HAClient haClient;
private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
// HA 监听端口 10912
this.acceptSocketService = new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
}
public void start() throws Exception {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}
}
需要注意的是,HA 监听端口 haListenPort
默认 10912
,但是 BrokerStartup 在创建 BrokerController 时,会设置 haListenPort = listenPort + 1
,listenPort 就是 Netty 的监听端口。
比如我配置 listenPort=30911
,那么 haListenPort=30912
。
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
2、Master监听Slave连接请求
AcceptSocketService 就是用来监听 slave 的连接,然后创建 HAConnection,默认监听的端口是 listenPort + 1
。
在 beginAccept()
中就会基于 NIO 去建立监听通道,注册多路复用器 Selector,来监听 ACCEPT
连接请求。
AcceptSocketService 也是一个 ServiceThread
,在它的 run 方法中,就是在不断监听slave节点的连接请求,有TCP连接请求过来后,就用连接通道 SocketChannel 创建一个 HAConnection
,并启动这个连接,然后将其添加到 HAService 的连接集合 List<HAConnection>
中。
class AcceptSocketService extends ServiceThread {
private final SocketAddress socketAddressListen;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public AcceptSocketService(final int port) {
this.socketAddressListen = new InetSocketAddress(port);
}
public void beginAccept() throws Exception {
// 基于 NIO 建立连接监听
this.serverSocketChannel = ServerSocketChannel.open();
// 打开一个多路复用器
this.selector = RemotingUtil.openSelector();
// 设置socket重用地址true
this.serverSocketChannel.socket().setReuseAddress(true);
// 绑定监听端口
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 非阻塞模式
this.serverSocketChannel.configureBlocking(false);
// 注册到多路复用器,监听连接请求
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (!this.isStopped()) {
try {
// 监听连接到达事件
this.selector.select(1000);
// 连接来了,拿到 SelectionKey
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
// 通过 accept 函数完成TCP连接,获取到一个网络连接通道 SocketChannel
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
}
}
}
selected.clear();
}
} catch (Exception e) {}
}
}
}
HA Slave 从节点
1、HAClient
AcceptSocketService 是 Master 节点用来接收 Slave 连接请求的,HAClient 就是对应的 Slave 客户端,会去连接 Master 节点。
HAClient
有如下一些属性,主从复制数据时都会用到。
- masterAddress:master 节点的地址
- reportOffset:上报的偏移量
- currentReportedOffset:当前上报的偏移量
- dispatchPosition:分发的位置
- byteBufferRead:读数据缓冲区
- byteBufferBackup:备份数据缓冲区
- lastWriteTimestamp:最近写入数据的时间
class HAClient extends ServiceThread {
// 读数据缓冲区大小 4MB
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// Master 地址
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// NIO 多路复用器
private final Selector selector;
// 与 Master 的连接通道
private SocketChannel socketChannel;
// 从节点收到数据后会返回一个 8 字节的 ack 偏移量
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// 写数据时间
private long lastWriteTimestamp = System.currentTimeMillis();
// 当前上报过的偏移量
private long currentReportedOffset = 0;
// 分发的位置
private int dispatchPosition = 0;
// 读数据缓冲区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 备份数据缓冲区
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
public HAClient() throws IOException {
this.selector = RemotingUtil.openSelector();
}
}
HAClient 也是一个 ServiceThread
:
-
HAClient 首先会去连接 master 节点,如果连接超时,或是其它网络情况等原因导致连接中断,HAClient 就会每隔 5 秒重新连接 master 节点。
-
连接到 master 后,先判断是否到了上报偏移量的时间,这个间隔时间由
haSendHeartbeatInterval
配置,即发送心跳的间隔时间,默认是5秒
。 -
上报完偏移量后,Selector 开始监听 master 的消息,监听到消息后开始处理读消息事件,其实就是将 master 发过来的消息再写入 CommitLog。
-
本地写入消息后,重新上报当前 CommitLog 的最大偏移量,表示现在同步到了哪里。
-
最后会判断下消息处理的时间,如果超过 5秒,就会认为网络连接有问题,就关系对 master 的连接,之后会再重新连接 master。
public void run() {
while (!this.isStopped()) {
try {
if (this.connectMaster()) {
// 超出上报偏移量的时间,上报 ack 偏移量
if (this.isTimeToReportOffset()) {
this.reportSlaveMaxOffset(this.currentReportedOffset);
}
// 等待消息
this.selector.select(1000);
// 处理读事件
boolean ok = this.processReadEvent();
// 上报slave最大偏移量(CommitLog 当前偏移量)
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 上报时间超过5秒,关闭连接
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
this.closeMaster();
}
} else {}
} catch (Exception e) {}
}
}
2、Slave连接Master
HAClient 在构造方法中会打开多路复用器 Selector,在connectMaster
方法中,就会去连接master地址(masterAddress
),拿到连接通道 SocketChannel,然后注册到多路复用器 Selector 上,开始监听 READ
读事件。
可以看到,AcceptSocketService 中是监听 ACCEPT
建立连接事件,HAClient 则监听 READ
读消息事件,也就是说主从复制是有 master 节点发送数据给 slave 去同步,然后 slave 上报同步的偏移量。
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 连接 Master 节点
this.socketChannel = RemotingUtil.connect(socketAddress);
// 注册到 Selector 里去
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
那 masterAddress
怎么来的呢,在 Broker 启动时,BrokerController 的初始化方法中会去设置 master 地址。
在没有启用 DLeger 技术的情况下,如果是 slave 节点,就会更新 masterAddress 的地址;如果是 master 节点,就会每隔60秒打印一次 master 和 slave 之间的同步偏移量差距。
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
// 更新HA地址
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
// 定期更新 HA Server 高可用地址
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
// 每隔60秒打印 Master/Slave 节点同步的差异
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
BrokerController.this.printMasterAndSlaveDiff();
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
如果配置了 master 的地址 haMasterAddress
,就会直接用这个地址。不过一般不用手动设置这个地址,这样它就可以自己周期性的更新 masterAddress,如果 master 挂了,就会主从切换,其它 slave 节点还可以自动更新新的 master 地址。
Broker 向 NameServer 注册的时候,会返回 haServerAddr
和 masterAddr
,也就是 master 的地址,然后更新 HAClient 的 masterAddress,以及 SlaveSynchronize 的 masterAddr。
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) {
// 向所有 NameServer 注册
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(....);
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
// 周期性的更新HA地址
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
// 主从节点,设置Master地址
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
}
}
}
Master 推送数据
1、HAConnection
slave 与 master 建立连接后,master 会创建一个 HAConnection
来保持与 slave 的连接状态,并支持读写数据。
可以看到 HAConnection 在创建时主要是做了一些 SocketChannel 的配置,然后对 HAService 中的连接数量 connectionCount
自增。其中最重要的便是创建并启动了 WriteSocketService
和 ReadSocketService
两个组件,看名字就知道是在处理网络连接的写数据和读数据;这两个组件都是一个 ServiceThread,也就是后台线程在异步运行着一个任务。
另外还有两个偏移量属性,slaveRequestOffset
表示slave请求获取的偏移量,slaveAckOffset
表示slave同步数据后ack的偏移量。
public class HAConnection {
private final HAService haService;
private final SocketChannel socketChannel;
private volatile long slaveRequestOffset = -1;
private volatile long slaveAckOffset = -1;
private final WriteSocketService writeSocketService;
private final ReadSocketService readSocketService;
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
this.socketChannel = socketChannel;
// 配置
this.socketChannel.configureBlocking(false);
....
// 读写组件
this.writeSocketService = new WriteSocketService(this.socketChannel);
this.readSocketService = new ReadSocketService(this.socketChannel);
// 连接数量+1
this.haService.getConnectionCount().incrementAndGet();
}
public void start() {
this.readSocketService.start();
this.writeSocketService.start();
}
}
2、Master向Slave推送消息
可以看到 WriteSocketService 在创建时,主要就是会向 SocketChannel 注册 WRITE
事件,也就是写入数据。
先记住它的一些属性:
- headerSize:数据的头大小,可以看出应该是由一个8字节和4字节的数据组成
- byteBufferHeader:用来写数据头的一个缓冲区,8+4 字节长度
- nextTransferFromWhere:表示从哪个位置开始传输
- selectMappedBufferResult:这个是从 MappedFile 返回的查询数据对象
- lastWriteOver:最后一次写数据是否完成
- lastWriteTimestamp:最后一次写数据的时间戳
class WriteSocketService extends ServiceThread {
private final int headerSize = 8 + 4;
private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
private long nextTransferFromWhere = -1;
private SelectMappedBufferResult selectMappedBufferResult;
private boolean lastWriteOver = true;
private long lastWriteTimestamp = System.currentTimeMillis();
public WriteSocketService(final SocketChannel socketChannel) throws IOException {
this.selector = RemotingUtil.openSelector();
this.socketChannel = socketChannel;
// 注册到多路复用器,监听通道写事件
this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
this.setDaemon(true);
}
}
再来看 WriteSocketService 的 run 方法:
-
首先判断 slaveRequestOffset,正常来说,slave 连接到 master 后,会上报一次自己的偏移量,初始值为 0,所以在还没开始传输数据的时候 slaveRequestOffset = 0。
-
还没开始传输前,nextTransferFromWhere = -1,这时就会去获取 CommitLog 的最大偏移量 maxOffset,然后计算master的偏移量:
masterOffset = maxOffset - maxOffset % commitLogFileSize
,这么算下来masterOffset其实就等于最后一个 MappedFile 的起始偏移量fileFromOffset
。然后将 nextTransferFromWhere 更新为 masterOffset。如果已经同步过了,那么 nextTransferFromWhere > 0, 就会将其更新为slaveRequestOffset
。这一步就是在更新下一次开始传输的位置,如果是一次传输,那么就从第一个 MappedFile 的起始位置开始,否则就是从slave之前同步的位置开始。
而 CommitLog 的 getMaxOffset() 看源码会发现其实就是获取最后一个 MappedFile 的写入位置,即
maxOffset=fileFromOffset + readPosition
,fileFromOffset 就是 MappedFile 的起始偏移量,readPosition 在存在 writeBuffer 时就是 committedPosition,否则就是 wrotePosition。 -
接着判断上一次写入数据是否完成(lastWriteOver),未完成就继续之前的数据传输。否则会每隔
5秒
调一次transferData()
来传输数据,它这里会先向byteBufferHeader
写入8个字节的 nextTransferFromWhere。 -
之后会根据 nextTransferFromWhere 去 CommitLog 读取数据,得到
SelectMappedBufferResult
,这就是要传输的消息数据。然后可以看到一次传输的数据不会超过
haTransferBatchSize(32KB)
大小,然后将 nextTransferFromWhere 加上这次传输的数据长度,表示下一次要传输的位置。然后更新byteBufferHeader
,前8字节存储当前传输的偏移量,后4字节存储当前传输的数据大小。最后就调用transferData()
传输数据。
public void run() {
while (!this.isStopped()) {
this.selector.select(1000);
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 更新下一个传输的偏移量 nextTransferFromWhere
if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset = masterOffset - (masterOffset % getMessageStoreConfig().getMappedFileSizeCommitLog());
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
}
// 向slave写入传输的开始位置
if (this.lastWriteOver) {
long interval = getSystemClock().now() - this.lastWriteTimestamp;
// 间隔5秒写入数据
if (interval > getMessageStoreConfig().getHaSendHeartbeatInterval()) {
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
}
} else {
this.lastWriteOver = this.transferData();
}
// 查询数据,同步数据
SelectMappedBufferResult selectResult = getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > getMessageStoreConfig().getHaTransferBatchSize()) {
size = getMessageStoreConfig().getHaTransferBatchSize();
}
// 下一次的偏移量
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset); // 写入传输的偏移量起点
this.byteBufferHeader.putInt(size); // 写入数据长度
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
// 没有数据传输就等待
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
}
..........
}
3、读取消息数据
WriteSocketService 会根据 nextTransferFromWhere
找 CommitLog 获取要同步的数据,可以看到它首先会根据这个偏移量找到所在的 MappedFile,然后计算出在 nextTransferFromWhere 在 MappedFile 中的相对位置,然后根据这个相对偏移量去 MappedFile 读取数据。
public SelectMappedBufferResult getData(final long offset /* 物理偏移量 */, final boolean returnFirstOnNotFound) {
int mappedFileSize = getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
接着看 MappedFile 里是如何根据偏移量读取数据的,它首先要判断读到什么位置(readPosition),这个值在 writeBuffer 存在的时候就是 committedPosition 的位置,这个位置之前的数据是已经commit到MappedByteBuffer了的;否则就是 wrotePosition 的位置,就是 MappedByteBuffer 当前写入的位置。然后根据 readPosition 和传入的要读的开始位置(pos),就能计算出要读取的数据大小(size)。
然后从 MappedByteBuffer 创建当前数据的视图 byteBuffer,将当前position
定位到要读的位置(pos),然后再得到一个新的视图 byteBufferNew,再限制它要读的数据大小(limit),这样byteBufferNew就是从 pos 到 readPosition 之间的数据了。
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
// 得到要读取数据的大小
int size = readPosition - pos;
// 剩余的 ByteBuffer
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
需要注意的是,每次最多传输32KB
的数据,所以获取到数据 SelectMappedBufferResult 后,还会重新限制要读取的数据范围。而 nextTransferFromWhere 也会变更为下一个位置。
int size = selectResult.getSize();
if (size > getMessageStoreConfig().getHaTransferBatchSize()) {
size = getMessageStoreConfig().getHaTransferBatchSize();
}
// 下一次的偏移量
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
// 限制大小
selectResult.getByteBuffer().limit(size);
4、消息数据传输
再来看数据传输部分,首先会向 slave 节点写入请求头,这个请求头已经分析过,包含一个8字节的当前传输的起始偏移量(thisOffset),以及传输的数据大小(size)。如果传输失败,比如网络问题,会重试三次。传输成功则更新最后一次写入时间 lastWriteTimestamp 为当前时间。
请求头写完之后,就开始写消息数据,如果失败同样也会重试三次,成功则更新 lastWriteTimestamp。
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// 写入请求头: thisOffset+size
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {...}
}
...
writeSizeZeroTimes = 0;
// 写入消息
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {...}
}
}
...
// 请求头和消息体是否都已经传输完成
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
return result;
}
Slave 接收数据
在 HAClient 中,slave 会监听处理来自 master 的消息,它会从连接通道把数据读到 byteBufferRead 中,如果没有读取到数据,也会重试三次。读取到数据后,就会分发读请求去处理。
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
// 读取数据
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {...}
} catch (IOException e) {...}
}
return true;
}
master 写入的请求头占 12字节,消息不超过32KB,slave 接收到后会读到 byteBufferRead
缓冲区中,这个读缓冲区的大小为4MB
,所以可以放入很多条来自master的数据。所以 HAClient 会有一个 dispatchPosition
来表示消息处理的位置。
每次处理的消息至少得有12字节,master 会单独发请求头过来,也可能会接着发送消息数据过来。
首先就会读取请求头数据,前8字节的物理偏移量masterPhyOffset
,接着4字节的消息长度bodySize
。然后判断当前slave已经写入的物理偏移量slavePhyOffset
,与master当前传输的偏移量(masterPhyOffset)是否一致,如果不一致,说明数据同步有问题,这时就会返回 false,返回 false 之后就会关闭连接通道,之后重新连接 master,重新建立连接时master中的 HAConnection 也会重建,然后 slave 会重新上报当前的偏移量,那么 WriteSocketService 中的 nextTransferFromWhere
就会更新为 slave 节点的物理偏移量,达到主从偏移量同步的目的。
接着就是将消息体追加到 CommitLog 中,进去可以看到就是将消息体数据写入 MappedFile 的 MappedByteBuffer 中。写完之后就更新 dispatchPosition,表示当前已处理消息的位置,下一次就从这个位置开始处理消息。
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
// 读缓冲区还有数据
if (diff >= msgHeaderSize) {
// Master 返回的物理偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 消息大小
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
// 从节点的物理偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 主从偏移量不一致
if (slavePhyOffset != masterPhyOffset) {
return false;
}
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = byteBufferRead.array();
int dataStart = this.dispatchPosition + msgHeaderSize;
// 向 CommitLog 追加消息
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData, dataStart, bodySize);
// 更新消息处理位置
this.dispatchPosition += msgHeaderSize + bodySize;
// 上报偏移量
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
// 读缓冲区没有空闲空间了
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
}
return true;
}
写完数据后,会重新上报偏移量,进去可以看到,主要是判断 CommitLog 已写入的偏移量如果大于当前上报的偏移量currentReportedOffset
,就会更新 currentReportedOffset 为 CommitLog 的偏移量,然后上报给 master 节点。而上报 slave 偏移量其实就是向连接通道写入8字节的偏移量长度。
private boolean reportSlaveMaxOffsetPlus() {
boolean result = true;
// salve 节点 CommitLog 的最大偏移量
long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// CommitLog 写入的偏移量已经大于上报的偏移量
if (currentPhyOffset > this.currentReportedOffset) {
// 直接更新为 CommitLog 的偏移量
this.currentReportedOffset = currentPhyOffset;
// 然后重新上报偏移量
result = this.reportSlaveMaxOffset(this.currentReportedOffset);
}
return result;
}
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.socketChannel.write(this.reportOffset);
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
读缓冲区 byteBufferRead 没有剩余空间时,就会重新分配。虽然 byteBufferRead 写满了,但可能数据还没处理完,这时会先将备份缓冲区 byteBufferBackup 复位到0,将剩余的数据先写入备份缓冲区中,然后将 byteBufferRead 和 byteBufferBackup 交换,再将处理位置 dispatchPosition 改为 0,之后 byteBufferRead 可以继续接收消息,然后从 0 开始处理消息。
private void reallocateByteBuffer() {
int remain = READ_MAX_BUFFER_SIZE - this.dispatchPosition;
// ByteBuffer 已经写满了,但数据还没分发处理完
if (remain > 0) {
this.byteBufferRead.position(this.dispatchPosition);
this.byteBufferBackup.position(0);
this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
// 把剩余数据写入备份的缓冲区中
this.byteBufferBackup.put(this.byteBufferRead);
}
// 交换 byteBufferRead 和 byteBufferBackup
this.swapByteBuffer();
this.byteBufferRead.position(remain);
this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
// 交换过后的 byteBufferRead 要从0开始处理数据
this.dispatchPosition = 0;
}
主从偏移量同步
HAClient 中,slave 节点每次同步数据后,会向 master 上报当前 CommitLog 的物理偏移量,每隔5秒也会上报一次。
HAConnection 中的 ReadSocketService 就是用来监听 slave 节点的发送的数据,监听到 READ 事件后,就会调用 processReadEvent() 来处理数据。如果处理READ事件失败或者处理时间超过20秒,就认为与slave的连接超时过期,当前这个连接就不要了,这时就会关闭相关资源,让slave重新连接。
class ReadSocketService extends ServiceThread {
// 读数据缓冲区大小 1M
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
// 读数据缓冲区
private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 处理消息的位置
private int processPosition = 0;
// 最近一次读取数据的时间戳
private volatile long lastReadTimestamp = System.currentTimeMillis();
public void run() {
while (!this.isStopped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) { break; }
long interval = getSystemCloc().now() - this.lastReadTimestamp;
// 处理读事件的时间大于20秒
if (interval > getMessageStoreConfig().getHaHousekeepingInterval()) {
break;
}
} catch (Exception e) {break;}
}
// 标记 ServiceThread 停止
this.makeStop();
writeSocketService.makeStop();
// 移除 HAConnection
haService.removeConnection(HAConnection.this);
HAConnection.this.haService.getConnectionCount().decrementAndGet();
// 停止网络通道
HAConnection.this.stopChannelAndSelector(this.socketChannel, this.selector, this.getServiceName());
}
}
来看master对slave上报偏移量的处理,整体看下来会发现跟 HAClient 处理 READ 事件是类似的。
ReadSocketService 用1MB
的读缓冲区 byteBufferRead
来缓冲通道的数据,然后用 processPosition
来表示已处理消息的位置,而 HAClient 每次上报偏移量的长度固定为 8
字节。
首先如果 byteBufferRead 读缓冲如果写满了,就会做复位(flip()
),processPosition 归零。这里和 HAClient 的处理是有区别的,这里如果 byteBufferRead 还有未处理的数据,复位后就直接丢弃了,因为偏移量相对来说没那么重要,反正 slave 至少5秒后会重新上报偏移量。而 slave 因为要同步消息数据,不能丢弃消息数据,所以 slave 会用一个 byteBufferBackup 备份缓冲区来备份剩余的数据。
从连接通道读到数据后,可以看到,它只会处理最后一条消息,读取8字节的偏移量,然后更新到 slaveAckOffset
,如果 slaveAckOffset 大于master的最大物理偏移量,说明主从同步有问题,需要slave重新连接同步。如果 slaveRequestOffset
= -1,会将其更新为最新的 slave 偏移量。
到这里可以理解 slaveAckOffset
和 slaveRequestOffset
的作用了:
-
slaveRequestOffset 只在 slave 第一次连接时才会更新,slave 连接 master 时会更新当前上报偏移量
currentReportedOffset
为自己 CommitLog 的最大偏移量,然后将其上报给 master。之后 WriteSocketService 向 slave 第一次同步数据时,nextTransferFromWhere
就会更新为 slaveRequestOffset,表示从这个位置开始同步数据。 -
而 slaveAckOffset 则会不断增加,表示 slave 已同步的物理偏移量。它会用来判断与 master 的同步是否一致,以及从这个位置开始继续同步数据。
private boolean processReadEvent() {
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
this.lastReadTimestamp = getSystemClock().now();
// 至少要读到8个字节(至少是一条数据,一次请求)
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// 最后一条消息的末尾偏移量
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// slave 上报的偏移量
long readOffset = this.byteBufferRead.getLong(pos - 8);
// 处理位置直接拉到最后
this.processPosition = pos;
// 设置 slaveAckOffset,slave 上报的偏移量
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
}
// Slave ack 偏移量 大于 最大物理偏移量
else if (HAConnection.this.slaveAckOffset > getDefaultMessageStore().getMaxPhyOffset()) {
return false;
}
// 从节点已经接收到一些数据了,通知HAService传输数据给从节点
haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
}
}
}
更新 slaveAckOffset 偏移量之后,还会发一个通知,更新 HAService 中的 push2SlaveMaxOffset
,即推送到 slave 的最大偏移量。然后再通知 GroupTransferService
数据已经传输了一部分到 slave 节点。
public void notifyTransferSome(final long offset) {
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
CommitLog 提交复制请求
1、提交replica请求
在 CommitLog 将消息写入 MappedFile 后,会进行 flush 刷盘和 replica 复制的两个操作,在上一篇文章已经介绍过刷盘机制,这里就来看下 replica 复制。
从 submitReplicaRequest
可以看出,当前Broker角色必须是 SYNC_MASTER
才会用 HAService 进行复制。Slave 节点可用的情况下,HAService 提交 GroupCommitRequest 请求,然后唤醒 HAService 的工作线程,最后同步等待同步结果。
可以看到,提交 replica 请求,跟同步提交 flush 请求很相似,同步 flush 是用 GroupCommitService 提交 GroupCommitRequest;同步 replica 是用 HAService 提交 GroupCommitRequest,那基本的工作原理也是类似的了。
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// 通过 HAService 判断 Slave 节点是否OK
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
return request.future(); // 同步等待
} else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
对 Slave 节点的状态判断如下,传入的参数是当前 master 节点写入的位置,首先要有连接到master的slave节点,然后(masterPutWhere - push2SlaveMaxOffset) < 256M
,push2SlaveMaxOffset 代表的是推送到slave节点的最大偏移量;就是说主从同步延迟数据量不超过 256M,不能落后太多,落后太多说明已经很久没向slave节点推送数据同步了,可能就是slave节点已经不可用了。
public boolean isSlaveOK(final long masterPutWhere) {
// 连接到 master 的 slave 数量
boolean result = this.connectionCount.get() > 0;
result = result && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < getMessageStoreConfig().getHaSlaveFallbehindMax());
return result;
}
2、GroupTransferService
HAService 会创建一个 GroupTransferService,跟 GroupCommitService 类似,它也有两个读和写的集合,提交replica请求时放入 requestsWrite,ServiceThread 运行任务等待结束后交换读写集合。开始等待数据同步完成,其实就是在等待 push2SlaveMaxOffset
的值大于 GroupCommitRequest 的 nextOffset。
前面已经分析过,WriteSocketService 线程会不断从 CommitLog 同步数据到 slave 节点;然后 slave 节点会上报当前同步的偏移量,ReadSocketService 线程收到 slave 上报的偏移量之后,会更新通知数据已经传输,并更新 HAService 中的 push2SlaveMaxOffset,这个值就表示当前同步到 slave 节点的物理偏移量。而 CommitLog 写入数据后,提交replica请求开始同步等待,其实就是判断这次写入数据的偏移量(nextOffset)是否已经被同步了。
class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>();
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
// 提交请求
public void putRequest(final CommitLog.GroupCommitRequest request) {
this.requestsWrite.add(request);
this.wakeup();
}
// 通知数据传输了一部分
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}
// 交换读写集合
private void swapRequests() {
LinkedList<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
// 等待数据传输完成
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long deadLine = req.getDeadLine();
while (!transferOK && deadLine - System.nanoTime() > 0) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead = new LinkedList<>();
}
}
// ServiceThread 运行任务
public void run() {
while (!this.isStopped()) {
this.waitForRunning(10);
this.doWaitTransfer();
}
}
}
SlaveSynchronize
除了 HAService 主从复制之外,SlaveSynchronize 也会去同步一些数据。
BrokerController 创建启动时,就会创建 SlaveSynchronize,在未启动 DLedger 技术时,就会开启 SlaveSynchronize 定时同步。
可以看出,在 Slave 节点,会用一个调度线程每隔 10 秒调用一次 slaveSynchronize 的同步方法。
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
private void handleSlaveSynchronize(BrokerRole role) {
// Slave 节点调度服务,每隔10秒调度一次
if (role == BrokerRole.SLAVE) {
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
BrokerController.this.slaveSynchronize.syncAll();
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
}
}
而 SlaveSynchronize 的同步就是在从 master 同步 Topic 元数据、消费者偏移量、延迟偏移量、订阅组配置 四类数据。
public void syncAll() {
// 同步 Topic 元数据
this.syncTopicConfig();
// 同步消费者偏移量
this.syncConsumerOffset();
// 同步延迟偏移量
this.syncDelayOffset();
// 同步订阅组配置
this.syncSubscriptionGroupConfig();
}
同步到本地后,会将这些数据持久化到本地的磁盘文件。
总结
HAService 主从复制基本分析完了,下面用一张图来总结下它的实现原理。
-
首先 Master Broker 和 Slave Broker 都会向 NameServer 注册,slave 节点会从 NameServer 得到这组 Broker 中 master(brokerId=0) 的地址 masterAddress。
-
master 节点如果未启用 DLeger 技术,默认启用 HAService 高可用机制,会创建一个 HAService 组件。HAService 通过 AcceptSocketService 来监听 slave 节点的连接请求。slave 节点启动时也会创建 HAService 组件,然后通过 HAClient 来连接 master 节点。
-
master 中 AcceptSocketService 监听到连接请求后,会创建一个 HAConnection 来保持与 slave 之间的连接通道。然后 HAConnection 会创建 ReadSocketService 和 WriteSocketService 两个线程。
-
连接到 master 后,slave 节点先上报自己当前的偏移量(currentReportedOffset),之后同步数据后也会不断上报自己的当前偏移量。ReadSocketService 接收 slave 的请求,更新当前的偏移量 slaveRequestOffset。
-
生产者向 master 发生消息,通过 DefaultMessageStore 写入 CommitLog 中,写入完成后,会提交一个 replica 请求到 GroupTransferService 中,这一步其实就是在等数据同步到 slave 节点。
-
WriteSocketService 线程则会根据当前 slave 同步的偏移量(nextTransferFromWhere),从 CommitLog 读取数据,发送到 slave 节点,slave 收到数据后,再写入 CommitLog,之后再上报当前偏移量。
-
AcceptSocketService 收到 slave 上报的偏移量之后,通知 GroupTransferService 数据已经同步了,那么 CommitLog 提交的 replica 请求也就完成了。
-
最后,在 Slave 节点还会启用 SlaveSynchronize 同步组件,每隔10秒调度一次,从 master 拉取 Topic元数据、延迟偏移量等数据同步到本地。
转载自:https://juejin.cn/post/7283328111504392246