『Netty核心』Netty心跳机制
「这是我参与11月更文挑战的第28天,活动详情查看:2021最后一次更文挑战」。
点赞再看,养成习惯👏👏
心跳检测机制
所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性。
心跳检测机制:客户端每隔一段时间发送PING消息给服务端,服务端接受到后回复PONG消息。客户端如果在一定时间内没有收到PONG响应,则认为连接断开,服务端如果在一定时间内没有收到来自客户端的PING请求,则认为连接已经断开。通过这种来回的PING-PONG消息机制侦测连接的活跃性。
在 Netty 中,本身也提供了 IdleStateHandler
用于检测连接闲置,该Handler可以检测连接未发生读写事件而触发相应事件。
看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
解释下三个参数的含义:
- eaderIdleTimeSeconds 读超时:当在指定的时间间隔内没有从
Channel
读取到数据时,会触发一个READER_IDLE
的IdleStateEvent
事件。 - riterIdleTimeSeconds 写超时:即当在指定的时间间隔内没有数据写入到
Channel
时,会触发一个WRITER_IDLE
的IdleStateEvent
事件。 - llIdleTimeSeconds 读/写超时:即当在指定的时间间隔内没有读或写操作时,会触发一个
ALL_IDLE
的IdleStateEvent
事件。
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
要实现 Netty 服务端心跳检测机制需要在服务器端的 ChannelInitializer
中加入如下的代码:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
初步地看下 IdleStateHandler
源码,先看下 IdleStateHandler 中的 channelRead
方法:
红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让 channelPipe 中的下一个 handler 处理channelRead 方法
我们再看看 channelActive
方法:
这里有个 initialize
的方法,这是 IdleStateHandler
的精髓,接着探究:
这边会触发一个 Task,ReaderIdleTimeoutTask
,这个task里的 run 方法源码是这样的:
第一个红框代码是用当前时间减去最后一次 channelRead
方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead 已经是6s之前的事情了,你设置的是5s,那么 nextDelay
则为-1,说明超时了,那么第二个红框代码则会触发下一个 handler 的 userEventTriggered
方法:
如果没有超时则不触发 userEventTriggered
方法。
服务端代码
public class HeartBeatServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("decoder",new StringDecoder());
channelPipeline.addLast("encoder",new StringEncoder());
channelPipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
channelPipeline.addLast(new HeartBeatServerHandler());
}
});
System.out.println("netty server start。。");
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
继承 SimpleChannelInboundHandler
重写 channelRead
方法和 userEventTriggered
方法
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if ("Heartbeat Packet".equals(s)){
ctx.channel().writeAndFlush("ok");
}else{
System.out.println(" 其他信息处理...");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "读空闲";
readIdleTimes++; // 读空闲的计数加1
break;
case WRITER_IDLE:
eventType = "写空闲";
// 不处理
break;
case ALL_IDLE:
eventType = "读写空闲";
// 不处理
break;
}
System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
if (readIdleTimes > 3){
System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
ctx.channel().writeAndFlush("idle close");
ctx.channel().close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}
}
客户端代码
public class HeartBeatClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("decoder",new StringDecoder());
channelPipeline.addLast("encoder",new StringEncoder());
channelPipeline.addLast(new HeartBeatClientHandler());
}
});
System.out.println("netty client start。。");
Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel();
String text = "Heartbeat Packet";
Random random = new Random();
while(channel.isActive()){
int num = random.nextInt(8);
Thread.sleep(num*1000);
channel.writeAndFlush(text);
}
}catch (Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" + msg);
if (msg != null && msg.equals("idle close")) {
System.out.println(" 服务端关闭连接,客户端也关闭");
ctx.channel().closeFuture();
}
}
}
}
转载自:https://juejin.cn/post/7035062415901327396