likes
comments
collection
share

RocketMQ 源码探究 -- 长连接与长轮询实现

作者站长头像
站长
· 阅读数 46

本文以 RocketMQ 为例,介绍了长连接和长轮询的基础概念以及代码示例,并结合 RocketMQ 代码讲解了这两项技术的实际运用,以及 RocketMQ 采用这两个技术的背景。

长连接

什么是长连接

TCP 本身没有长连接短连接的概念,一个连接没有被关闭,就算是「长连接」,如果接收完请求响应就关闭,则为「短连接」

下面是一个基于 ServerSocket 的网络程序,程序监听 8081 端口,将输入倒转后输出到客户端。

public static void main(String[] args) throws Exception {
    try (ServerSocket serverSocket = new ServerSocket(8081)) {
        Socket socket = serverSocket.accept();
        socket.setKeepAlive(true);
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
        while (true) {
            String s = in.readLine();
            System.out.println("received:" + s + " at:" + System.currentTimeMillis());
            out.println(StringUtils.reverse(s));
        }
    }
}

执行程序,然后通过 telnet 127.0.0.1 8081 与应用程序连接,通过 Wireshark 抓包结果如下。

RocketMQ 源码探究 -- 长连接与长轮询实现

Wireshark 抓包结果

可以看到除了前三个是用来创建连接的,其余都是每次请求和响应的数据包。

长连接在 RocketMQ 中的应用

长连接主要应用在同 name server 的通信上,包括 producer 向 name server 请求路由信息、broker 向 name server 同步心跳、producer/consumer 向 broker 请求等。

心跳机制

在长连接的保持过程中,服务端需要发现并清理无用客户端连接,在 TCP 中,通过 keepalive 机制实现。

在 Linux 中,默认每隔 7200s 会由服务端向客户端发送探活包,如果收到 RST 回包,则认为客户端已经断开连接,服务端也会将此连接断开,以节省资源。

我们可以通过 netstat -altpno 命令打印距下次发送探活包的剩余时间。

RocketMQ 源码探究 -- 长连接与长轮询实现

keepalive 机制探活时间

TCP 的 keepalive 机制并不适用于 RocketMQ,原因如下:

  • 探活时间过长。 虽然可以修改,但这是操作系统层面的修改,影响全局。
  • 网络通不代表客户端存活。 当客户端因 GC 频繁、CPU 负载高等原因已经无法对外提供访问了,但此时网络仍然是通的。

在 RocketMQ 中,更多的是通过应用层的心跳来实现连接保持的功能。

拿 broker 向 name server 发送心跳包来说,broker 每隔 30s 请求一次 name server,name server 也会定期清理长时间(默认 120s)不发送请求的 broker。

下面是 RocketMQ 启动网络的 Netty 代码,可以看到直接禁用了 TCP 的 keepalive 机制,并且利用 IdleStateHandler 做心跳检测的补充。

ServerBootstrap childHandler =
 this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
     .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 1024)
     .option(ChannelOption.SO_REUSEADDR, true)
      // 禁用 TCP 的 keepalive 机制。
     .option(ChannelOption.SO_KEEPALIVE, false)
     .childOption(ChannelOption.TCP_NODELAY, true)
     .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
     .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
     .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline()
                 .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                 .addLast(defaultEventExecutorGroup,
                     encoder,
                     new NettyDecoder(),
                     // 启用心跳检测 Handler。
                     new IdleStateHandler(00, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                     connectionManageHandler,
                     serverHandler
                 );
         }
     });

长连接优势

长连接适用于点对点的,操作频繁的场景。

对于 RocketMQ 来说再合适不过了,RocketMQ 中各组件相对稳定,路由关系比较确定。

RocketMQ 需要频繁地发送和接收消息,各个组件之间通过心跳沟通。符合操作频繁场景。

长轮询

什么是长轮询

轮询(Polling)简单来说就是定时地查询某个外部系统的状态,一般等到变更到特定状态时,轮询结束。

// client code
while (true) {
 String status = sendRequest(param);
 if (Objects.equals(status, "ok") {
  break;
 } else {
  TimeUnit.SECONDS.sleep(1);
 }
}

Long polling is itself not a true push; long polling is a variation of the traditional polling technique, but it allows emulating a push mechanism under circumstances where a real push is not possible.                             — wiki

长轮询(Long polling)是轮询的一种变体,通过服务端的操作,将 pull 操作模拟成 push 操作。

// server code
long deadline = pollTime + System.currentTimeMillis();
while (true) {
 if (System.currentTimeMillis() < deadline) {
  Result result = getResult(param);
  if (result != null) {
   return result;
  } else {
   TimeUnit.SECONDS.sleep(1);
  }
 } else {
  return null;
 }
}

长轮询在 RocketMQ 中的应用

长轮询主要用于 consumer 和 broker 的连接上。

RocketMQ 支持推拉两种消费模式,其中推模式是建立在拉模式上的。

在拉模式下,consumer 向 broker 发起拉取数据请求,携带允许暂停时间(默认 30s)请求。broker 通过 PullMessageProcessor#processRequest 处理请求,通过 consume queue 查找 commitlog 数据,如果没有找到且允许长轮询,则将请求放入 PullRequestHoldService ,以 topic 和 queue 为区分值,放入到 List 里,默认每隔 5s  再次处理请求,如果获取到数据,返回。否则再次进入 List,直到获取到数据或者超过最大允许暂停时间。

RocketMQ 源码探究 -- 长连接与长轮询实现

RocketMQ  长轮询流程

长轮询优势

长轮询机制可以减少 consumer 和 broker 的连接请求数,减少消息时延,提升消息实时率。

总结

  • RocketMQ 中默认所有请求都是长连接的,除非手动关闭连接。
  • RocketMQ 中长连接采用应用间心跳方式续租。
  • 长轮询模式支撑了推拉模式,减少消息时延。
转载自:https://juejin.cn/post/7374725115449688116
评论
请登录