RocketMQ 源码探究 -- 长连接与长轮询实现
本文以 RocketMQ 为例,介绍了长连接和长轮询的基础概念以及代码示例,并结合 RocketMQ 代码讲解了这两项技术的实际运用,以及 RocketMQ 采用这两个技术的背景。
长连接
什么是长连接
TCP 本身没有长连接短连接的概念,一个连接没有被关闭,就算是「长连接」,如果接收完请求响应就关闭,则为「短连接」
下面是一个基于 ServerSocket 的网络程序,程序监听 8081 端口,将输入倒转后输出到客户端。
public static void main(String[] args) throws Exception {
try (ServerSocket serverSocket = new ServerSocket(8081)) {
Socket socket = serverSocket.accept();
socket.setKeepAlive(true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
while (true) {
String s = in.readLine();
System.out.println("received:" + s + " at:" + System.currentTimeMillis());
out.println(StringUtils.reverse(s));
}
}
}
执行程序,然后通过 telnet 127.0.0.1 8081
与应用程序连接,通过 Wireshark 抓包结果如下。
Wireshark 抓包结果
可以看到除了前三个是用来创建连接的,其余都是每次请求和响应的数据包。
长连接在 RocketMQ 中的应用
长连接主要应用在同 name server 的通信上,包括 producer 向 name server 请求路由信息、broker 向 name server 同步心跳、producer/consumer 向 broker 请求等。
心跳机制
在长连接的保持过程中,服务端需要发现并清理无用客户端连接,在 TCP 中,通过 keepalive 机制实现。
在 Linux 中,默认每隔 7200s 会由服务端向客户端发送探活包,如果收到 RST 回包,则认为客户端已经断开连接,服务端也会将此连接断开,以节省资源。
我们可以通过 netstat -altpno
命令打印距下次发送探活包的剩余时间。
keepalive 机制探活时间
TCP 的 keepalive 机制并不适用于 RocketMQ,原因如下:
- 探活时间过长。 虽然可以修改,但这是操作系统层面的修改,影响全局。
- 网络通不代表客户端存活。 当客户端因 GC 频繁、CPU 负载高等原因已经无法对外提供访问了,但此时网络仍然是通的。
在 RocketMQ 中,更多的是通过应用层的心跳来实现连接保持的功能。
拿 broker 向 name server 发送心跳包来说,broker 每隔 30s 请求一次 name server,name server 也会定期清理长时间(默认 120s)不发送请求的 broker。
下面是 RocketMQ 启动网络的 Netty 代码,可以看到直接禁用了 TCP 的 keepalive 机制,并且利用 IdleStateHandler 做心跳检测的补充。
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
// 禁用 TCP 的 keepalive 机制。
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
// 启用心跳检测 Handler。
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
长连接优势
长连接适用于点对点的,操作频繁的场景。
对于 RocketMQ 来说再合适不过了,RocketMQ 中各组件相对稳定,路由关系比较确定。
RocketMQ 需要频繁地发送和接收消息,各个组件之间通过心跳沟通。符合操作频繁场景。
长轮询
什么是长轮询
轮询(Polling)简单来说就是定时地查询某个外部系统的状态,一般等到变更到特定状态时,轮询结束。
// client code
while (true) {
String status = sendRequest(param);
if (Objects.equals(status, "ok") {
break;
} else {
TimeUnit.SECONDS.sleep(1);
}
}
Long polling is itself not a true push; long polling is a variation of the traditional polling technique, but it allows emulating a push mechanism under circumstances where a real push is not possible. — wiki
长轮询(Long polling)是轮询的一种变体,通过服务端的操作,将 pull 操作模拟成 push 操作。
// server code
long deadline = pollTime + System.currentTimeMillis();
while (true) {
if (System.currentTimeMillis() < deadline) {
Result result = getResult(param);
if (result != null) {
return result;
} else {
TimeUnit.SECONDS.sleep(1);
}
} else {
return null;
}
}
长轮询在 RocketMQ 中的应用
长轮询主要用于 consumer 和 broker 的连接上。
RocketMQ 支持推拉两种消费模式,其中推模式是建立在拉模式上的。
在拉模式下,consumer 向 broker 发起拉取数据请求,携带允许暂停时间(默认 30s)请求。broker 通过 PullMessageProcessor#processRequest
处理请求,通过 consume queue 查找 commitlog 数据,如果没有找到且允许长轮询,则将请求放入 PullRequestHoldService
,以 topic 和 queue 为区分值,放入到 List 里,默认每隔 5s 再次处理请求,如果获取到数据,返回。否则再次进入 List,直到获取到数据或者超过最大允许暂停时间。
RocketMQ 长轮询流程
长轮询优势
长轮询机制可以减少 consumer 和 broker 的连接请求数,减少消息时延,提升消息实时率。
总结
- RocketMQ 中默认所有请求都是长连接的,除非手动关闭连接。
- RocketMQ 中长连接采用应用间心跳方式续租。
- 长轮询模式支撑了推拉模式,减少消息时延。
转载自:https://juejin.cn/post/7374725115449688116