likes
comments
collection
share

『Netty核心』Netty心跳机制

作者站长头像
站长
· 阅读数 46

「这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战」。

点赞再看,养成习惯👏👏

心跳检测机制

所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性。

心跳检测机制:客户端每隔一段时间发送PING消息给服务端,服务端接受到后回复PONG消息。客户端如果在一定时间内没有收到PONG响应,则认为连接断开,服务端如果在一定时间内没有收到来自客户端的PING请求,则认为连接已经断开。通过这种来回的PING-PONG消息机制侦测连接的活跃性。

在 Netty 中,本身也提供了 IdleStateHandler 用于检测连接闲置,该Handler可以检测连接未发生读写事件而触发相应事件。

看下它的构造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

解释下三个参数的含义:

  • eaderIdleTimeSeconds 读超时:当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLEIdleStateEvent 事件。
  • riterIdleTimeSeconds 写超时:即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLEIdleStateEvent 事件。
  • llIdleTimeSeconds 读/写超时:即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLEIdleStateEvent 事件。

注:这三个参数默认的时间单位是。若需要指定其他时间单位,可以使用另一个构造方法:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

要实现 Netty 服务端心跳检测机制需要在服务器端的 ChannelInitializer 中加入如下的代码:

pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

初步地看下 IdleStateHandler 源码,先看下 IdleStateHandler 中的 channelRead 方法:

『Netty核心』Netty心跳机制

红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让 channelPipe 中的下一个 handler 处理channelRead 方法

我们再看看 channelActive 方法:

『Netty核心』Netty心跳机制

这里有个 initialize 的方法,这是 IdleStateHandler 的精髓,接着探究:

『Netty核心』Netty心跳机制

这边会触发一个 TaskReaderIdleTimeoutTask,这个task里的 run 方法源码是这样的:

『Netty核心』Netty心跳机制

第一个红框代码是用当前时间减去最后一次 channelRead 方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead 已经是6s之前的事情了,你设置的是5s,那么 nextDelay 则为-1,说明超时了,那么第二个红框代码则会触发下一个 handler 的 userEventTriggered 方法:

『Netty核心』Netty心跳机制

如果没有超时则不触发 userEventTriggered 方法。

服务端代码

public class HeartBeatServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast("decoder",new StringDecoder());
                            channelPipeline.addLast("encoder",new StringEncoder());
                            channelPipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
                            channelPipeline.addLast(new HeartBeatServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

继承 SimpleChannelInboundHandler 重写 channelRead 方法和 userEventTriggered 方法

public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {

    int readIdleTimes = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(" ====== > [server] message received : " + s);
        if ("Heartbeat Packet".equals(s)){
            ctx.channel().writeAndFlush("ok");
        }else{
            System.out.println(" 其他信息处理...");
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        String eventType = null;
        switch (event.state()){
            case READER_IDLE:
                eventType = "读空闲";
                readIdleTimes++; // 读空闲的计数加1
                break;
            case WRITER_IDLE:
                eventType = "写空闲";
                // 不处理
                break;
            case ALL_IDLE:
                eventType = "读写空闲";
                // 不处理
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
        if (readIdleTimes > 3){
            System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
            ctx.channel().writeAndFlush("idle close");
            ctx.channel().close();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
    }
}

客户端代码

public class HeartBeatClient {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast("decoder",new StringDecoder());
                            channelPipeline.addLast("encoder",new StringEncoder());
                            channelPipeline.addLast(new HeartBeatClientHandler());
                        }
                    });
            System.out.println("netty client start。。");
            Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel();
            String text = "Heartbeat Packet";
            Random random = new Random();
            while(channel.isActive()){
                int num = random.nextInt(8);
                Thread.sleep(num*1000);
                channel.writeAndFlush(text);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }

    static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(" client received :" + msg);
            if (msg != null && msg.equals("idle close")) {
                System.out.println(" 服务端关闭连接,客户端也关闭");
                ctx.channel().closeFuture();
            }
        }
    }
}