likes
comments
collection
share

RocketMQ源码系列(8) — 消息存储之主从复制

作者站长头像
站长
· 阅读数 10

这篇文章我们来看下 RocketMQ 的高可用机制之一的主从复制模式,所谓的主从复制就是一主多从模式,生产者向 Master 节点写入数据,然后将数据同步给 Slave 节点做数据备份,消费者可以从 master 或 slave 节点消费数据。RocketMQ 默认配置下是开启主从复制,核心组件为 HAService,功能就是将 master 数据复制到 slave 节点。但其不支持主从自动切换,这种模式下,master 宕机后,集群将不能继续写入消息,但可以从 slave 节点消费消息。

HA 主从复制

HA Master 主节点

1、HAService 核心组件

在 DefaultMessageStore 的构造方法中,会根据是否启用DLedger技术来判断使用哪种类型的高可用机制,DLeger 技术默认是关闭的,也就是默认启用 HAService 高可用机制。

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    this.haService = new HAService(this);
} else {
    this.haService = null;
}

HAService 组件:

  • List<HAConnection>:主从建立的网络连接,一个 master 可能有多个 slave,每一个 HAConnection 就代表一个 slave 节点
  • AcceptSocketService:接收slave节点的 Socket 服务
  • GroupTransferService:组传输的组件
  • HAClient:主从同步客户端组件
  • WaitNotifyObject:线程阻塞,等待通知组件
  • connectionCount:连接到 master 的slave数量
  • push2SlaveMaxOffset:推送到slave的最大偏移量

在 DefaultMessageStore 的 start() 方法中,会调用 HAService 的 start() 方法来启动这些组件。

public class HAService {
    private final AtomicInteger connectionCount = new AtomicInteger(0);
    private final List<HAConnection> connectionList = new LinkedList<>();
    private final AcceptSocketService acceptSocketService;
    private final DefaultMessageStore defaultMessageStore;
    private final GroupTransferService groupTransferService;
    private final HAClient haClient;
    private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
    private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);

    public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
        this.defaultMessageStore = defaultMessageStore;
        // HA 监听端口 10912
        this.acceptSocketService = new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
        this.groupTransferService = new GroupTransferService();
        this.haClient = new HAClient();
    }

    public void start() throws Exception {
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
        this.groupTransferService.start();
        this.haClient.start();
    }
}

需要注意的是,HA 监听端口 haListenPort 默认 10912,但是 BrokerStartup 在创建 BrokerController 时,会设置 haListenPort = listenPort + 1,listenPort 就是 Netty 的监听端口。

比如我配置 listenPort=30911,那么 haListenPort=30912

messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);

2、Master监听Slave连接请求

AcceptSocketService 就是用来监听 slave 的连接,然后创建 HAConnection,默认监听的端口是 listenPort + 1

beginAccept() 中就会基于 NIO 去建立监听通道,注册多路复用器 Selector,来监听 ACCEPT 连接请求。

AcceptSocketService 也是一个 ServiceThread,在它的 run 方法中,就是在不断监听slave节点的连接请求,有TCP连接请求过来后,就用连接通道 SocketChannel 创建一个 HAConnection,并启动这个连接,然后将其添加到 HAService 的连接集合 List<HAConnection> 中。

class AcceptSocketService extends ServiceThread {
    private final SocketAddress socketAddressListen;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;

    public AcceptSocketService(final int port) {
        this.socketAddressListen = new InetSocketAddress(port);
    }

    public void beginAccept() throws Exception {
        // 基于 NIO 建立连接监听
        this.serverSocketChannel = ServerSocketChannel.open();
        // 打开一个多路复用器
        this.selector = RemotingUtil.openSelector();
        // 设置socket重用地址true
        this.serverSocketChannel.socket().setReuseAddress(true);
        // 绑定监听端口
        this.serverSocketChannel.socket().bind(this.socketAddressListen);
        // 非阻塞模式
        this.serverSocketChannel.configureBlocking(false);
        // 注册到多路复用器,监听连接请求
        this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
                // 监听连接到达事件
                this.selector.select(1000);
                // 连接来了,拿到 SelectionKey
                Set<SelectionKey> selected = this.selector.selectedKeys();

                if (selected != null) {
                    for (SelectionKey k : selected) {
                        if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                            // 通过 accept 函数完成TCP连接,获取到一个网络连接通道 SocketChannel
                            SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                            if (sc != null) {
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                conn.start();
                                HAService.this.addConnection(conn);
                            }
                        }
                    }
                    selected.clear();
                }
            } catch (Exception e) {}
        }
    }
}

HA Slave 从节点

1、HAClient

AcceptSocketService 是 Master 节点用来接收 Slave 连接请求的,HAClient 就是对应的 Slave 客户端,会去连接 Master 节点。

HAClient 有如下一些属性,主从复制数据时都会用到。

  • masterAddress:master 节点的地址
  • reportOffset:上报的偏移量
  • currentReportedOffset:当前上报的偏移量
  • dispatchPosition:分发的位置
  • byteBufferRead:读数据缓冲区
  • byteBufferBackup:备份数据缓冲区
  • lastWriteTimestamp:最近写入数据的时间
class HAClient extends ServiceThread {
    // 读数据缓冲区大小 4MB
    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
    // Master 地址
    private final AtomicReference<String> masterAddress = new AtomicReference<>();
    // NIO 多路复用器
    private final Selector selector;
    // 与 Master 的连接通道
    private SocketChannel socketChannel;

    // 从节点收到数据后会返回一个 8 字节的 ack 偏移量
    private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
    // 写数据时间
    private long lastWriteTimestamp = System.currentTimeMillis();
    // 当前上报过的偏移量
    private long currentReportedOffset = 0;
    // 分发的位置
    private int dispatchPosition = 0;

    // 读数据缓冲区
    private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
    // 备份数据缓冲区
    private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

    public HAClient() throws IOException {
        this.selector = RemotingUtil.openSelector();
    }
}

HAClient 也是一个 ServiceThread

  • HAClient 首先会去连接 master 节点,如果连接超时,或是其它网络情况等原因导致连接中断,HAClient 就会每隔 5 秒重新连接 master 节点。

  • 连接到 master 后,先判断是否到了上报偏移量的时间,这个间隔时间由 haSendHeartbeatInterval 配置,即发送心跳的间隔时间,默认是 5秒

  • 上报完偏移量后,Selector 开始监听 master 的消息,监听到消息后开始处理读消息事件,其实就是将 master 发过来的消息再写入 CommitLog。

  • 本地写入消息后,重新上报当前 CommitLog 的最大偏移量,表示现在同步到了哪里。

  • 最后会判断下消息处理的时间,如果超过 5秒,就会认为网络连接有问题,就关系对 master 的连接,之后会再重新连接 master。

public void run() {
    while (!this.isStopped()) {
        try {
            if (this.connectMaster()) {
                // 超出上报偏移量的时间,上报 ack 偏移量
                if (this.isTimeToReportOffset()) {
                    this.reportSlaveMaxOffset(this.currentReportedOffset);
                }
                // 等待消息
                this.selector.select(1000);
                // 处理读事件
                boolean ok = this.processReadEvent();

                // 上报slave最大偏移量(CommitLog 当前偏移量)
                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                // 上报时间超过5秒,关闭连接
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                    this.closeMaster();
                }
            } else {}
        } catch (Exception e) {}
    }
}

2、Slave连接Master

HAClient 在构造方法中会打开多路复用器 Selector,在connectMaster方法中,就会去连接master地址(masterAddress),拿到连接通道 SocketChannel,然后注册到多路复用器 Selector 上,开始监听 READ 读事件。

可以看到,AcceptSocketService 中是监听 ACCEPT 建立连接事件,HAClient 则监听 READ 读消息事件,也就是说主从复制是有 master 节点发送数据给 slave 去同步,然后 slave 上报同步的偏移量。

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                // 连接 Master 节点
                this.socketChannel = RemotingUtil.connect(socketAddress);
                // 注册到 Selector 里去
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }

        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}

masterAddress 怎么来的呢,在 Broker 启动时,BrokerController 的初始化方法中会去设置 master 地址。

在没有启用 DLeger 技术的情况下,如果是 slave 节点,就会更新 masterAddress 的地址;如果是 master 节点,就会每隔60秒打印一次 master 和 slave 之间的同步偏移量差距。

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
            // 更新HA地址
            this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
            this.updateMasterHAServerAddrPeriodically = false;
        } else {
            // 定期更新 HA Server 高可用地址
            this.updateMasterHAServerAddrPeriodically = true;
        }
    } else {
        // 每隔60秒打印 Master/Slave 节点同步的差异
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            BrokerController.this.printMasterAndSlaveDiff();
        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    }
}

如果配置了 master 的地址 haMasterAddress,就会直接用这个地址。不过一般不用手动设置这个地址,这样它就可以自己周期性的更新 masterAddress,如果 master 挂了,就会主从切换,其它 slave 节点还可以自动更新新的 master 地址。

Broker 向 NameServer 注册的时候,会返回 haServerAddrmasterAddr,也就是 master 的地址,然后更新 HAClient 的 masterAddress,以及 SlaveSynchronize 的 masterAddr。

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) {
    // 向所有 NameServer 注册
    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(....);

    if (registerBrokerResultList.size() > 0) {
        RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
        if (registerBrokerResult != null) {
            // 周期性的更新HA地址
            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
            }
            // 主从节点,设置Master地址
            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
        }
    }
}

Master 推送数据

1、HAConnection

slave 与 master 建立连接后,master 会创建一个 HAConnection 来保持与 slave 的连接状态,并支持读写数据。

可以看到 HAConnection 在创建时主要是做了一些 SocketChannel 的配置,然后对 HAService 中的连接数量 connectionCount 自增。其中最重要的便是创建并启动了 WriteSocketServiceReadSocketService 两个组件,看名字就知道是在处理网络连接的写数据和读数据;这两个组件都是一个 ServiceThread,也就是后台线程在异步运行着一个任务。

另外还有两个偏移量属性,slaveRequestOffset 表示slave请求获取的偏移量,slaveAckOffset 表示slave同步数据后ack的偏移量。

public class HAConnection {
    private final HAService haService;
    private final SocketChannel socketChannel;
    private volatile long slaveRequestOffset = -1;
    private volatile long slaveAckOffset = -1;
    
    private final WriteSocketService writeSocketService;
    private final ReadSocketService readSocketService;

    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
        this.haService = haService;
        this.socketChannel = socketChannel;
        // 配置
        this.socketChannel.configureBlocking(false);
        ....
        // 读写组件
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        // 连接数量+1
        this.haService.getConnectionCount().incrementAndGet();
    }
    public void start() {
        this.readSocketService.start();
        this.writeSocketService.start();
    }
}    

2、Master向Slave推送消息

可以看到 WriteSocketService 在创建时,主要就是会向 SocketChannel 注册 WRITE 事件,也就是写入数据。

先记住它的一些属性:

  • headerSize:数据的头大小,可以看出应该是由一个8字节和4字节的数据组成
  • byteBufferHeader:用来写数据头的一个缓冲区,8+4 字节长度
  • nextTransferFromWhere:表示从哪个位置开始传输
  • selectMappedBufferResult:这个是从 MappedFile 返回的查询数据对象
  • lastWriteOver:最后一次写数据是否完成
  • lastWriteTimestamp:最后一次写数据的时间戳
class WriteSocketService extends ServiceThread {
    private final int headerSize = 8 + 4;
    private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
    private long nextTransferFromWhere = -1;
    private SelectMappedBufferResult selectMappedBufferResult;
    private boolean lastWriteOver = true;
    private long lastWriteTimestamp = System.currentTimeMillis();

    public WriteSocketService(final SocketChannel socketChannel) throws IOException {
        this.selector = RemotingUtil.openSelector();
        this.socketChannel = socketChannel;
        // 注册到多路复用器,监听通道写事件
        this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
        this.setDaemon(true);
    }
}

再来看 WriteSocketService 的 run 方法:

  1. 首先判断 slaveRequestOffset,正常来说,slave 连接到 master 后,会上报一次自己的偏移量,初始值为 0,所以在还没开始传输数据的时候 slaveRequestOffset = 0。

  2. 还没开始传输前,nextTransferFromWhere = -1,这时就会去获取 CommitLog 的最大偏移量 maxOffset,然后计算master的偏移量:masterOffset = maxOffset - maxOffset % commitLogFileSize,这么算下来masterOffset其实就等于最后一个 MappedFile 的起始偏移量 fileFromOffset。然后将 nextTransferFromWhere 更新为 masterOffset。如果已经同步过了,那么 nextTransferFromWhere > 0, 就会将其更新为 slaveRequestOffset

    这一步就是在更新下一次开始传输的位置,如果是一次传输,那么就从第一个 MappedFile 的起始位置开始,否则就是从slave之前同步的位置开始。

    而 CommitLog 的 getMaxOffset() 看源码会发现其实就是获取最后一个 MappedFile 的写入位置,即 maxOffset=fileFromOffset + readPosition,fileFromOffset 就是 MappedFile 的起始偏移量,readPosition 在存在 writeBuffer 时就是 committedPosition,否则就是 wrotePosition。

  3. 接着判断上一次写入数据是否完成(lastWriteOver),未完成就继续之前的数据传输。否则会每隔5秒调一次 transferData() 来传输数据,它这里会先向 byteBufferHeader 写入8个字节的 nextTransferFromWhere。

  4. 之后会根据 nextTransferFromWhere 去 CommitLog 读取数据,得到 SelectMappedBufferResult,这就是要传输的消息数据。

    然后可以看到一次传输的数据不会超过 haTransferBatchSize(32KB)大小,然后将 nextTransferFromWhere 加上这次传输的数据长度,表示下一次要传输的位置。然后更新 byteBufferHeader,前8字节存储当前传输的偏移量,后4字节存储当前传输的数据大小。最后就调用 transferData() 传输数据。

public void run() {
    while (!this.isStopped()) {
        this.selector.select(1000);
        if (-1 == HAConnection.this.slaveRequestOffset) {
            Thread.sleep(10);
            continue;
        }
        // 更新下一个传输的偏移量 nextTransferFromWhere
        if (-1 == this.nextTransferFromWhere) {
            if (0 == HAConnection.this.slaveRequestOffset) {
                long masterOffset = getDefaultMessageStore().getCommitLog().getMaxOffset();
                masterOffset = masterOffset - (masterOffset %  getMessageStoreConfig().getMappedFileSizeCommitLog());
                this.nextTransferFromWhere = masterOffset;
            } else {
                this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
            }
        }

        // 向slave写入传输的开始位置
        if (this.lastWriteOver) {
            long interval = getSystemClock().now() - this.lastWriteTimestamp;
            // 间隔5秒写入数据
            if (interval > getMessageStoreConfig().getHaSendHeartbeatInterval()) {
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(headerSize);
                this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                this.byteBufferHeader.putInt(0);
                this.byteBufferHeader.flip();
                this.lastWriteOver = this.transferData();
            }
        } else {
            this.lastWriteOver = this.transferData();
        }

        // 查询数据,同步数据
        SelectMappedBufferResult selectResult = getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
        if (selectResult != null) {
            int size = selectResult.getSize();
            if (size >  getMessageStoreConfig().getHaTransferBatchSize()) {
                size = getMessageStoreConfig().getHaTransferBatchSize();
            }

            // 下一次的偏移量
            long thisOffset = this.nextTransferFromWhere;
            this.nextTransferFromWhere += size;

            selectResult.getByteBuffer().limit(size);
            this.selectMappedBufferResult = selectResult;
            // Build Header
            this.byteBufferHeader.position(0);
            this.byteBufferHeader.limit(headerSize);
            this.byteBufferHeader.putLong(thisOffset); // 写入传输的偏移量起点
            this.byteBufferHeader.putInt(size); // 写入数据长度
            this.byteBufferHeader.flip();
            this.lastWriteOver = this.transferData();
        } else {
            // 没有数据传输就等待
            HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
        }
    }
    ..........
}

3、读取消息数据

WriteSocketService 会根据 nextTransferFromWhere 找 CommitLog 获取要同步的数据,可以看到它首先会根据这个偏移量找到所在的 MappedFile,然后计算出在 nextTransferFromWhere 在 MappedFile 中的相对位置,然后根据这个相对偏移量去 MappedFile 读取数据。

public SelectMappedBufferResult getData(final long offset /* 物理偏移量 */, final boolean returnFirstOnNotFound) {
    int mappedFileSize = getMessageStoreConfig().getMappedFileSizeCommitLog();
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
        return result;
    }
    return null;
}

接着看 MappedFile 里是如何根据偏移量读取数据的,它首先要判断读到什么位置(readPosition),这个值在 writeBuffer 存在的时候就是 committedPosition 的位置,这个位置之前的数据是已经commit到MappedByteBuffer了的;否则就是 wrotePosition 的位置,就是 MappedByteBuffer 当前写入的位置。然后根据 readPosition 和传入的要读的开始位置(pos),就能计算出要读取的数据大小(size)。

然后从 MappedByteBuffer 创建当前数据的视图 byteBuffer,将当前position定位到要读的位置(pos),然后再得到一个新的视图 byteBufferNew,再限制它要读的数据大小(limit),这样byteBufferNew就是从 pos 到 readPosition 之间的数据了。

public SelectMappedBufferResult selectMappedBuffer(int pos) {
    int readPosition = getReadPosition();

    if (pos < readPosition && pos >= 0) {
        if (this.hold()) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(pos);
            // 得到要读取数据的大小
            int size = readPosition - pos;
            // 剩余的 ByteBuffer
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        }
    }
    return null;
}
public int getReadPosition() {
    return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

需要注意的是,每次最多传输32KB的数据,所以获取到数据 SelectMappedBufferResult 后,还会重新限制要读取的数据范围。而 nextTransferFromWhere 也会变更为下一个位置。

int size = selectResult.getSize();
if (size >  getMessageStoreConfig().getHaTransferBatchSize()) {
    size = getMessageStoreConfig().getHaTransferBatchSize();
}
// 下一次的偏移量
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
// 限制大小
selectResult.getByteBuffer().limit(size);

4、消息数据传输

再来看数据传输部分,首先会向 slave 节点写入请求头,这个请求头已经分析过,包含一个8字节的当前传输的起始偏移量(thisOffset),以及传输的数据大小(size)。如果传输失败,比如网络问题,会重试三次。传输成功则更新最后一次写入时间 lastWriteTimestamp 为当前时间。

请求头写完之后,就开始写消息数据,如果失败同样也会重试三次,成功则更新 lastWriteTimestamp。

private boolean transferData() throws Exception {
    int writeSizeZeroTimes = 0;
    // 写入请求头: thisOffset+size
    while (this.byteBufferHeader.hasRemaining()) {
        int writeSize = this.socketChannel.write(this.byteBufferHeader);
        if (writeSize > 0) {
            writeSizeZeroTimes = 0;
            this.lastWriteTimestamp = getSystemClock().now();
        } else if (writeSize == 0) {
            if (++writeSizeZeroTimes >= 3) {
                break;
            }
        } else {...}
    }
    ...
    writeSizeZeroTimes = 0;
    // 写入消息
    if (!this.byteBufferHeader.hasRemaining()) {
        while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
            int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
            if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            } else if (writeSize == 0) {
                if (++writeSizeZeroTimes >= 3) {
                    break;
                }
            } else {...}
        }
    }
    ...
    // 请求头和消息体是否都已经传输完成
    boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
    return result;
}

Slave 接收数据

在 HAClient 中,slave 会监听处理来自 master 的消息,它会从连接通道把数据读到 byteBufferRead 中,如果没有读取到数据,也会重试三次。读取到数据后,就会分发读请求去处理。

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 读取数据
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                boolean result = this.dispatchReadRequest();
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {...}
        } catch (IOException e) {...}
    }
    return true;
}

master 写入的请求头占 12字节,消息不超过32KB,slave 接收到后会读到 byteBufferRead 缓冲区中,这个读缓冲区的大小为4MB,所以可以放入很多条来自master的数据。所以 HAClient 会有一个 dispatchPosition 来表示消息处理的位置。

每次处理的消息至少得有12字节,master 会单独发请求头过来,也可能会接着发送消息数据过来。

首先就会读取请求头数据,前8字节的物理偏移量masterPhyOffset,接着4字节的消息长度bodySize。然后判断当前slave已经写入的物理偏移量slavePhyOffset,与master当前传输的偏移量(masterPhyOffset)是否一致,如果不一致,说明数据同步有问题,这时就会返回 false,返回 false 之后就会关闭连接通道,之后重新连接 master,重新建立连接时master中的 HAConnection 也会重建,然后 slave 会重新上报当前的偏移量,那么 WriteSocketService 中的 nextTransferFromWhere 就会更新为 slave 节点的物理偏移量,达到主从偏移量同步的目的。

接着就是将消息体追加到 CommitLog 中,进去可以看到就是将消息体数据写入 MappedFile 的 MappedByteBuffer 中。写完之后就更新 dispatchPosition,表示当前已处理消息的位置,下一次就从这个位置开始处理消息。

private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size

    while (true) {
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        // 读缓冲区还有数据
        if (diff >= msgHeaderSize) {
            // Master 返回的物理偏移量
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
            // 消息大小
            int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
            // 从节点的物理偏移量
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

            // 主从偏移量不一致
            if (slavePhyOffset != masterPhyOffset) {
                return false;
            }
            if (diff >= (msgHeaderSize + bodySize)) {
                byte[] bodyData = byteBufferRead.array();
                int dataStart = this.dispatchPosition + msgHeaderSize;
                // 向 CommitLog 追加消息
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData, dataStart, bodySize);
                // 更新消息处理位置
                this.dispatchPosition += msgHeaderSize + bodySize;
                // 上报偏移量
                if (!reportSlaveMaxOffsetPlus()) {
                    return false;
                }
                continue;
            }
        }
        // 读缓冲区没有空闲空间了
        if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }
    }
    return true;
}

写完数据后,会重新上报偏移量,进去可以看到,主要是判断 CommitLog 已写入的偏移量如果大于当前上报的偏移量currentReportedOffset,就会更新 currentReportedOffset 为 CommitLog 的偏移量,然后上报给 master 节点。而上报 slave 偏移量其实就是向连接通道写入8字节的偏移量长度。

private boolean reportSlaveMaxOffsetPlus() {
    boolean result = true;
    // salve 节点 CommitLog 的最大偏移量
    long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
    // CommitLog 写入的偏移量已经大于上报的偏移量
    if (currentPhyOffset > this.currentReportedOffset) {
        // 直接更新为 CommitLog 的偏移量
        this.currentReportedOffset = currentPhyOffset;
        // 然后重新上报偏移量
        result = this.reportSlaveMaxOffset(this.currentReportedOffset);
    }
    return result;
}

private boolean reportSlaveMaxOffset(final long maxOffset) {
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    this.reportOffset.putLong(maxOffset);
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    this.socketChannel.write(this.reportOffset);
    
    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    return !this.reportOffset.hasRemaining();
}

读缓冲区 byteBufferRead 没有剩余空间时,就会重新分配。虽然 byteBufferRead 写满了,但可能数据还没处理完,这时会先将备份缓冲区 byteBufferBackup 复位到0,将剩余的数据先写入备份缓冲区中,然后将 byteBufferRead 和 byteBufferBackup 交换,再将处理位置 dispatchPosition 改为 0,之后 byteBufferRead 可以继续接收消息,然后从 0 开始处理消息。

private void reallocateByteBuffer() {
    int remain = READ_MAX_BUFFER_SIZE - this.dispatchPosition;
    // ByteBuffer 已经写满了,但数据还没分发处理完
    if (remain > 0) {
        this.byteBufferRead.position(this.dispatchPosition);

        this.byteBufferBackup.position(0);
        this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
        // 把剩余数据写入备份的缓冲区中
        this.byteBufferBackup.put(this.byteBufferRead);
    }
    // 交换 byteBufferRead 和 byteBufferBackup
    this.swapByteBuffer();

    this.byteBufferRead.position(remain);
    this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
    // 交换过后的 byteBufferRead 要从0开始处理数据
    this.dispatchPosition = 0;
}

RocketMQ源码系列(8) — 消息存储之主从复制

主从偏移量同步

HAClient 中,slave 节点每次同步数据后,会向 master 上报当前 CommitLog 的物理偏移量,每隔5秒也会上报一次。

HAConnection 中的 ReadSocketService 就是用来监听 slave 节点的发送的数据,监听到 READ 事件后,就会调用 processReadEvent() 来处理数据。如果处理READ事件失败或者处理时间超过20秒,就认为与slave的连接超时过期,当前这个连接就不要了,这时就会关闭相关资源,让slave重新连接。

class ReadSocketService extends ServiceThread {
    // 读数据缓冲区大小 1M
    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
    // 读数据缓冲区
    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
    // 处理消息的位置
    private int processPosition = 0;
    // 最近一次读取数据的时间戳
    private volatile long lastReadTimestamp = System.currentTimeMillis();
    
    public void run() {
        while (!this.isStopped()) {
            try {
                this.selector.select(1000);
                boolean ok = this.processReadEvent();
                if (!ok) { break; }
                long interval =  getSystemCloc().now() - this.lastReadTimestamp;
                // 处理读事件的时间大于20秒
                if (interval > getMessageStoreConfig().getHaHousekeepingInterval()) {
                    break;
                }
            } catch (Exception e) {break;}
        }

        // 标记 ServiceThread 停止
        this.makeStop();
        writeSocketService.makeStop();
        // 移除 HAConnection
        haService.removeConnection(HAConnection.this);
        HAConnection.this.haService.getConnectionCount().decrementAndGet();
        // 停止网络通道
        HAConnection.this.stopChannelAndSelector(this.socketChannel, this.selector, this.getServiceName());
    }
}    

来看master对slave上报偏移量的处理,整体看下来会发现跟 HAClient 处理 READ 事件是类似的。

ReadSocketService 用1MB的读缓冲区 byteBufferRead 来缓冲通道的数据,然后用 processPosition 来表示已处理消息的位置,而 HAClient 每次上报偏移量的长度固定为 8字节。

首先如果 byteBufferRead 读缓冲如果写满了,就会做复位(flip()),processPosition 归零。这里和 HAClient 的处理是有区别的,这里如果 byteBufferRead 还有未处理的数据,复位后就直接丢弃了,因为偏移量相对来说没那么重要,反正 slave 至少5秒后会重新上报偏移量。而 slave 因为要同步消息数据,不能丢弃消息数据,所以 slave 会用一个 byteBufferBackup 备份缓冲区来备份剩余的数据。

从连接通道读到数据后,可以看到,它只会处理最后一条消息,读取8字节的偏移量,然后更新到 slaveAckOffset,如果 slaveAckOffset 大于master的最大物理偏移量,说明主从同步有问题,需要slave重新连接同步。如果 slaveRequestOffset = -1,会将其更新为最新的 slave 偏移量。

到这里可以理解 slaveAckOffsetslaveRequestOffset 的作用了:

  • slaveRequestOffset 只在 slave 第一次连接时才会更新,slave 连接 master 时会更新当前上报偏移量 currentReportedOffset 为自己 CommitLog 的最大偏移量,然后将其上报给 master。之后 WriteSocketService 向 slave 第一次同步数据时,nextTransferFromWhere就会更新为 slaveRequestOffset,表示从这个位置开始同步数据。

  • 而 slaveAckOffset 则会不断增加,表示 slave 已同步的物理偏移量。它会用来判断与 master 的同步是否一致,以及从这个位置开始继续同步数据。

private boolean processReadEvent() {
    if (!this.byteBufferRead.hasRemaining()) {
        this.byteBufferRead.flip();
        this.processPosition = 0;
    }
    while (this.byteBufferRead.hasRemaining()) {
        try {
        int readSize = this.socketChannel.read(this.byteBufferRead);
        if (readSize > 0) {
            this.lastReadTimestamp = getSystemClock().now();
            // 至少要读到8个字节(至少是一条数据,一次请求)
            if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                // 最后一条消息的末尾偏移量
                int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                // slave 上报的偏移量
                long readOffset = this.byteBufferRead.getLong(pos - 8);
                // 处理位置直接拉到最后
                this.processPosition = pos;
                // 设置 slaveAckOffset,slave 上报的偏移量
                HAConnection.this.slaveAckOffset = readOffset;

                if (HAConnection.this.slaveRequestOffset < 0) {
                    HAConnection.this.slaveRequestOffset = readOffset;
                }
                // Slave ack 偏移量 大于 最大物理偏移量
                else if (HAConnection.this.slaveAckOffset > getDefaultMessageStore().getMaxPhyOffset()) {
                    return false;
                }
                // 从节点已经接收到一些数据了,通知HAService传输数据给从节点
                haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
            }
        }
    }
}

更新 slaveAckOffset 偏移量之后,还会发一个通知,更新 HAService 中的 push2SlaveMaxOffset,即推送到 slave 的最大偏移量。然后再通知 GroupTransferService 数据已经传输了一部分到 slave 节点。

public void notifyTransferSome(final long offset) {
    for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
        boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
        if (ok) {
            this.groupTransferService.notifyTransferSome();
            break;
        } else {
            value = this.push2SlaveMaxOffset.get();
        }
    }
}

CommitLog 提交复制请求

1、提交replica请求

在 CommitLog 将消息写入 MappedFile 后,会进行 flush 刷盘和 replica 复制的两个操作,在上一篇文章已经介绍过刷盘机制,这里就来看下 replica 复制。

submitReplicaRequest 可以看出,当前Broker角色必须是 SYNC_MASTER 才会用 HAService 进行复制。Slave 节点可用的情况下,HAService 提交 GroupCommitRequest 请求,然后唤醒 HAService 的工作线程,最后同步等待同步结果。

可以看到,提交 replica 请求,跟同步提交 flush 请求很相似,同步 flush 是用 GroupCommitService 提交 GroupCommitRequest;同步 replica 是用 HAService 提交 GroupCommitRequest,那基本的工作原理也是类似的了。

public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // 通过 HAService 判断 Slave 节点是否OK
            if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                        this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                return request.future(); // 同步等待
            } else {
                return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
            }
        }
    }
    return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}

对 Slave 节点的状态判断如下,传入的参数是当前 master 节点写入的位置,首先要有连接到master的slave节点,然后(masterPutWhere - push2SlaveMaxOffset) < 256M,push2SlaveMaxOffset 代表的是推送到slave节点的最大偏移量;就是说主从同步延迟数据量不超过 256M,不能落后太多,落后太多说明已经很久没向slave节点推送数据同步了,可能就是slave节点已经不可用了。

public boolean isSlaveOK(final long masterPutWhere) {
    // 连接到 master 的 slave 数量
    boolean result = this.connectionCount.get() > 0;
    result = result && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < getMessageStoreConfig().getHaSlaveFallbehindMax());
    return result;
}

2、GroupTransferService

HAService 会创建一个 GroupTransferService,跟 GroupCommitService 类似,它也有两个读和写的集合,提交replica请求时放入 requestsWrite,ServiceThread 运行任务等待结束后交换读写集合。开始等待数据同步完成,其实就是在等待 push2SlaveMaxOffset 的值大于 GroupCommitRequest 的 nextOffset。

前面已经分析过,WriteSocketService 线程会不断从 CommitLog 同步数据到 slave 节点;然后 slave 节点会上报当前同步的偏移量,ReadSocketService 线程收到 slave 上报的偏移量之后,会更新通知数据已经传输,并更新 HAService 中的 push2SlaveMaxOffset,这个值就表示当前同步到 slave 节点的物理偏移量。而 CommitLog 写入数据后,提交replica请求开始同步等待,其实就是判断这次写入数据的偏移量(nextOffset)是否已经被同步了。

class GroupTransferService extends ServiceThread {
    private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
    private volatile LinkedList<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>();
    private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
    // 提交请求
    public void putRequest(final CommitLog.GroupCommitRequest request) {
        this.requestsWrite.add(request);
        this.wakeup();
    }
    // 通知数据传输了一部分
    public void notifyTransferSome() {
        this.notifyTransferObject.wakeup();
    }
    // 交换读写集合
    private void swapRequests() {
        LinkedList<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    }
    // 等待数据传输完成
    private void doWaitTransfer() {
        if (!this.requestsRead.isEmpty()) {
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                long deadLine = req.getDeadLine();
                while (!transferOK && deadLine - System.nanoTime() > 0) {
                    this.notifyTransferObject.waitForRunning(1000);
                    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                }
                req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
            }
            this.requestsRead = new LinkedList<>();
        }
    }
    // ServiceThread 运行任务
    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(10);
            this.doWaitTransfer();
        }
    }
}

SlaveSynchronize

除了 HAService 主从复制之外,SlaveSynchronize 也会去同步一些数据。

BrokerController 创建启动时,就会创建 SlaveSynchronize,在未启动 DLedger 技术时,就会开启 SlaveSynchronize 定时同步。

可以看出,在 Slave 节点,会用一个调度线程每隔 10 秒调用一次 slaveSynchronize 的同步方法。

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
    this.registerBrokerAll(true, false, true);
}

private void handleSlaveSynchronize(BrokerRole role) {
    // Slave 节点调度服务,每隔10秒调度一次
    if (role == BrokerRole.SLAVE) {
        this.slaveSynchronize.setMasterAddr(null);
        slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            public void run() {
                BrokerController.this.slaveSynchronize.syncAll();
            }
        }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
    } 
}

而 SlaveSynchronize 的同步就是在从 master 同步 Topic 元数据、消费者偏移量、延迟偏移量、订阅组配置 四类数据。

public void syncAll() {
    // 同步 Topic 元数据
    this.syncTopicConfig();
    // 同步消费者偏移量
    this.syncConsumerOffset();
    // 同步延迟偏移量
    this.syncDelayOffset();
    // 同步订阅组配置
    this.syncSubscriptionGroupConfig();
}

同步到本地后,会将这些数据持久化到本地的磁盘文件。

RocketMQ源码系列(8) — 消息存储之主从复制

总结

HAService 主从复制基本分析完了,下面用一张图来总结下它的实现原理。

  1. 首先 Master Broker 和 Slave Broker 都会向 NameServer 注册,slave 节点会从 NameServer 得到这组 Broker 中 master(brokerId=0) 的地址 masterAddress。

  2. master 节点如果未启用 DLeger 技术,默认启用 HAService 高可用机制,会创建一个 HAService 组件。HAService 通过 AcceptSocketService 来监听 slave 节点的连接请求。slave 节点启动时也会创建 HAService 组件,然后通过 HAClient 来连接 master 节点。

  3. master 中 AcceptSocketService 监听到连接请求后,会创建一个 HAConnection 来保持与 slave 之间的连接通道。然后 HAConnection 会创建 ReadSocketService 和 WriteSocketService 两个线程。

  4. 连接到 master 后,slave 节点先上报自己当前的偏移量(currentReportedOffset),之后同步数据后也会不断上报自己的当前偏移量。ReadSocketService 接收 slave 的请求,更新当前的偏移量 slaveRequestOffset。

  5. 生产者向 master 发生消息,通过 DefaultMessageStore 写入 CommitLog 中,写入完成后,会提交一个 replica 请求到 GroupTransferService 中,这一步其实就是在等数据同步到 slave 节点。

  6. WriteSocketService 线程则会根据当前 slave 同步的偏移量(nextTransferFromWhere),从 CommitLog 读取数据,发送到 slave 节点,slave 收到数据后,再写入 CommitLog,之后再上报当前偏移量。

  7. AcceptSocketService 收到 slave 上报的偏移量之后,通知 GroupTransferService 数据已经同步了,那么 CommitLog 提交的 replica 请求也就完成了。

  8. 最后,在 Slave 节点还会启用 SlaveSynchronize 同步组件,每隔10秒调度一次,从 master 拉取 Topic元数据、延迟偏移量等数据同步到本地。

RocketMQ源码系列(8) — 消息存储之主从复制