likes
comments
collection
share

项目中引入了Netty,让设备动起来了!😍人生道路,免不了各种各样的挑战,在面对挑战的时候,我们要保持良好的心态去面对

作者站长头像
站长
· 阅读数 61

🌉 题记

人生道路,免不了各种各样的挑战,在面对挑战的时候,我们要保持良好的心态去面对。不管成功与否,也要积极面对,享受过程。

🎠 起缘

忘了具体是哪一天,只记得是一天的下午,刚小眯了一会,老大过来找到我说:“ 来活了 ”,我说:“ 那不挺好的吗? ”。让人闲下来的心可以变得忙一点起来,感觉还不错的样子。就这样,探讨了一番新的需求。

由于我们是做物联网方面的,核心就是接受设备上报过来的数据,服务端进行保存,实现数据展示,可以控制设备的开关、设备异常提醒等等,能达到硬件设备与软件服务器互联的效果。本次需求核心也是一样,主要第三方设备那边说用到Socket开发,需要我们做一个服务端进行接收数据,然后达到连接设备控制设备的效果。老大给我:“ 实现了,好给我加鸡腿,哈哈。” 随后我就着手准备开干。

项目中引入了Netty,让设备动起来了!😍人生道路,免不了各种各样的挑战,在面对挑战的时候,我们要保持良好的心态去面对

找寻了资料,看到使用WebSocket比较多一点,然后就根据网上的教程进行搭建,通过在浏览器上测试是可以进行请求访问的。随后就把这个报给了老大,老大说,把这个上到服务器,让第三方设备端先测试连接一下。心里想着鸡腿有望了,哈哈!!!项目部署上了服务器,给第三方发了请求域名地址,结果那边说连不上,这下心态崩了😭,好不容易弄好的,结果用不成。

项目中引入了Netty,让设备动起来了!😍人生道路,免不了各种各样的挑战,在面对挑战的时候,我们要保持良好的心态去面对

没办法,接着搞呗!那边是硬件,不是浏览器请求,或许是这种方式不支持,然后就找与硬件设备交互的Socket,结果让我找到了Netty,随之进行研究学习进行搭建服务。过了几天,给老大说:“ 换了一种方式,这下应该可以吧? ”这下我以最快的速度把代码上到服务器,给第三方发了请求域名地址,这次没让我失望,可以连上。这下应该鸡腿有下落了吧!!!

项目中引入了Netty,让设备动起来了!😍人生道路,免不了各种各样的挑战,在面对挑战的时候,我们要保持良好的心态去面对

接下来的几天,主要和第三方进行测试连接,还有就是测试服务端反馈消息给客户端,这块需要按照人家给的格式进行处理。测试了几天之后,感觉差不多了,就可以开始编写自己的业务逻辑代码了。

好了,接下来,分享一下Springboot整合Netty。

🌈 开工

🏠 Netty介绍

Netty是一个基于NIO的客户端、服务端的编程框架,提供异步的、事件驱动的网络应用程序框架和工具。使用Netty可以确保快速地更简单的开发出一个网络应用。例如:基于TCP和UDP的socket服务开发。

🏡 引入

如果是maven项目的话,直接在pom文件中引入netty依赖即可。

<!--Netty-->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.68.Final</version>
</dependency>

🏫 配置

application.yml中添加netty配置,服务器地址的话,需要注意的是,在服务器上配置的话需要写成服务器内网ip才行。

# netty 配置
netty:
  # 服务器端口
  port: 8299
  # 服务器地址
  host: 192.168.0.108

🏢 编码

📕 Netty服务启动类(NettyServer)

配置Netty服务启动,并设置一些需要参数。

@Slf4j
@Component
public class NettyServer {
    @Value("${netty.host}")
    private String host;

    @Value("${netty.port}")
    private Integer port;
    /**
     * 主线程组
     */
    private  NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);

    /**
     * 工作线程组
     */
    private  NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);

    /**
     * 启动 netty 服务
     */
    @PostConstruct
    public void start() {
        try {
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .channel(NioServerSocketChannel.class)
                    .group(bossGroup, workerGroup)  // 绑定线程池
                    // 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出)
                    .option(ChannelOption.SO_BACKLOG, 1024*10)
                    // 解决端口占用问题, 可以共用服务器端口(即使该端口已被其他端口占用)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    // 接收消息缓冲区大小
                    .option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10)
                    // 发送消息缓冲区大小
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10)
                    // 用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;
                    // 如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送
                    .option(ChannelOption.TCP_NODELAY, true)
                    // 用于检测长时间没有数据传输的连接状态,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024 * 10))
                    .childHandler(new NettyServerChannelInitializer())
                    // 当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证全部发送成功
                    // 使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送
                    .option(ChannelOption.SO_LINGER, 2000);
            log.info("netty服务器开始监听端口" );
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = bootstrap.bind(new InetSocketAddress(host,port)).sync();
            if(future.isSuccess()){
                log.info("NETTY服务启动成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 销毁
     */
    @PreDestroy
    public void destroy(){
        bossGroup.shutdownGracefully().syncUninterruptibly();
        workerGroup.shutdownGracefully().syncUninterruptibly();
        log.info("关闭 Netty 成功");
    }
}

📗 Netty服务端初始化类(NettyServerChannelInitializer)

主要设置编码格式,设置读写超时时间,加载Netty服务端处理类

public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel){
        channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
        channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
        channel.pipeline().addLast(new IdleStateHandler(600, 600, 600, TimeUnit.SECONDS));
        channel.pipeline().addLast(new NettyServerHandler());
    }
}

📘 Netty服务端处理类(NettyServerHandler)

主要用来处理客户端发送过来的消息,并进行反馈。同时监听出现异常、超时情况等

@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private ReceiveDataService receiveDataService;
    public static NettyServerHandler nettyServerHandler;

    @PostConstruct
    public void init() {
        nettyServerHandler = this;
        nettyServerHandler.receiveDataService = this.receiveDataService;
    }

    /**
     * 管理一个全局map,保存连接进服务端的通道数量
     */
    private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();

    /**
     * @param ctx
     * @return void
     * @description:有客户端连接服务器会触发此函数
     * @author: user
     * @date: 2023-11-14 11:22:40
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        try {
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            int clientPort = insocket.getPort();
            //获取连接通道唯一标识
            ChannelId channelId = ctx.channel().id();
            //如果map中不包含此连接,就保存连接
            if (CHANNEL_MAP.containsKey(channelId)) {
                log.info("NETTY-Socket-客户端【" + channelId + "】是连接状态,此时的连接通道数量: " + CHANNEL_MAP.size());
            } else {
                //保存连接
                CHANNEL_MAP.put(channelId, ctx);
                log.info("NETTY-Socket-客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]" + ",此时的连接通道数量: " + CHANNEL_MAP.size());
            }
        } catch (Exception e) {
            log.error("channelActive error:" + e.fillInStackTrace());
        }

    }

    /**
     * @param ctx
     * @return void
     * @description:有客户端终止连接服务器会触发此函数
     * @author: user
     * @date: 2023-11-14 11:24:48
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        try {
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            ChannelId channelId = ctx.channel().id();
            //包含此客户端才去删除
            if (CHANNEL_MAP.containsKey(channelId)) {
                //删除连接
                CHANNEL_MAP.remove(channelId);
                log.info("NETTY客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]" + ",此时的连接通道数量: " + CHANNEL_MAP.size());
            }
        } catch (Exception e) {
            log.error("channelInactive error:" + e.fillInStackTrace());
        }
    }

    /**
     * 有客户端发消息会触发此函数
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg == null) {
            throw new Exception("NETTY-Socket加载客户端报文为空!");
        }
        log.info("NETTY-Socket-接收客户端报文【" + ctx.channel().id() + "】" + " :" + msg);
        //给客户端发送消息
        this.channelWrite(ctx.channel().id(),msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
        log.info("NETTY-Socket-【" + ctx.channel().id() + "】 数据接收完毕");
    }

    /**
     * 服务端给客户端发送消息
     *
     * @param channelId 连接通道唯一id
     * @param msg       需要发送的消息内容
     * @throws Exception
     */
    public void channelWrite(ChannelId channelId, Object msg) {
        try {
            ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
            if (ctx == null) {
                log.info("NETTY-Socket-通道【" + channelId + "】不存在");
                return;
            }
            if (msg == null || msg == "") {
                log.info("NETTY-Socket-服务端响应空的消息");
                return;
            }
            //将客户端的信息直接返回写入ctx,同时进行刷新
            ctx.writeAndFlush(msg);
        } catch (Exception e) {
            log.error("channelWrite error:" + ExceptionUtil.getMessage(e));
        }
    }


    /**
     * 事件触发(读写超时、总超时)
     * @param ctx
     * @param evt
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        try {
            String socketString = ctx.channel().remoteAddress().toString();
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    log.info("userEventTriggered-NETTY-Client: " + socketString + " READER_IDLE 读超时");
                    ctx.disconnect();
                } else if (event.state() == IdleState.WRITER_IDLE) {
                    log.info("userEventTriggered-NETTY-Client: " + socketString + " WRITER_IDLE 写超时");
                    ctx.disconnect();
                } else if (event.state() == IdleState.ALL_IDLE) {
                    log.info("userEventTriggered-NETTY-Client: " + socketString + " ALL_IDLE 总超时");
                    ctx.disconnect();
                }
            }
        } catch (Exception e) {
            log.error("userEventTriggered error:" + ExceptionUtil.getMessage(e));
        }
    }

    /**
     * 发生异常会触发此函数
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("exceptionCaught error:" + cause.fillInStackTrace());
        cause.printStackTrace();
        ctx.close();
        log.error("NETTY-Socket:" + ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
    }
}

📚 总结

以上就是我进行Netty开发的一些实例代码,也是一次不错的提升。

如果大家有需要的话,可以进行参考。由于本人能力有限,如有不对的地方,还需指正。

🎡 展望

人生在世,短短几十年,保持积极向上的心态去对待每一天,每一天都是一个新的开始,加油!!!

人生是一场很美妙并且很奇妙的旅行,在这个过程中每前进一步都充满无尽的可能与希望。

项目中引入了Netty,让设备动起来了!😍人生道路,免不了各种各样的挑战,在面对挑战的时候,我们要保持良好的心态去面对
转载自:https://juejin.cn/post/7395849559388127243
评论
请登录