项目中引入了Netty,让设备动起来了!😍人生道路,免不了各种各样的挑战,在面对挑战的时候,我们要保持良好的心态去面对
🌉 题记
人生道路,免不了各种各样的挑战,在面对挑战的时候,我们要保持良好的心态去面对。不管成功与否,也要积极面对,享受过程。
🎠 起缘
忘了具体是哪一天,只记得是一天的下午,刚小眯了一会,老大过来找到我说:“ 来活了 ”,我说:“ 那不挺好的吗? ”。让人闲下来的心可以变得忙一点起来,感觉还不错的样子。就这样,探讨了一番新的需求。
由于我们是做物联网方面的,核心就是接受设备上报过来的数据,服务端进行保存,实现数据展示,可以控制设备的开关、设备异常提醒等等,能达到硬件设备与软件服务器互联的效果。本次需求核心也是一样,主要第三方设备那边说用到Socket开发,需要我们做一个服务端进行接收数据,然后达到连接设备控制设备的效果。老大给我:“ 实现了,好给我加鸡腿,哈哈。” 随后我就着手准备开干。

找寻了资料,看到使用WebSocket比较多一点,然后就根据网上的教程进行搭建,通过在浏览器上测试是可以进行请求访问的。随后就把这个报给了老大,老大说,把这个上到服务器,让第三方设备端先测试连接一下。心里想着鸡腿有望了,哈哈!!!项目部署上了服务器,给第三方发了请求域名地址,结果那边说连不上,这下心态崩了😭,好不容易弄好的,结果用不成。
没办法,接着搞呗!那边是硬件,不是浏览器请求,或许是这种方式不支持,然后就找与硬件设备交互的Socket,结果让我找到了Netty,随之进行研究学习进行搭建服务。过了几天,给老大说:“ 换了一种方式,这下应该可以吧? ”这下我以最快的速度把代码上到服务器,给第三方发了请求域名地址,这次没让我失望,可以连上。这下应该鸡腿有下落了吧!!!
接下来的几天,主要和第三方进行测试连接,还有就是测试服务端反馈消息给客户端,这块需要按照人家给的格式进行处理。测试了几天之后,感觉差不多了,就可以开始编写自己的业务逻辑代码了。
好了,接下来,分享一下Springboot整合Netty。
🌈 开工
🏠 Netty介绍
Netty是一个基于NIO的客户端、服务端的编程框架,提供异步的、事件驱动的网络应用程序框架和工具。使用Netty可以确保快速地更简单的开发出一个网络应用。例如:基于TCP和UDP的socket服务开发。
🏡 引入
如果是maven项目的话,直接在pom文件中引入netty
依赖即可。
<!--Netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
🏫 配置
在application.yml中添加netty配置
,服务器地址的话,需要注意的是,在服务器上配置的话需要写成服务器内网ip才行。
# netty 配置
netty:
# 服务器端口
port: 8299
# 服务器地址
host: 192.168.0.108
🏢 编码
📕 Netty服务启动类(NettyServer)
配置Netty服务启动,并设置一些需要参数。
@Slf4j
@Component
public class NettyServer {
@Value("${netty.host}")
private String host;
@Value("${netty.port}")
private Integer port;
/**
* 主线程组
*/
private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
/**
* 工作线程组
*/
private NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
/**
* 启动 netty 服务
*/
@PostConstruct
public void start() {
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(bossGroup, workerGroup) // 绑定线程池
// 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出)
.option(ChannelOption.SO_BACKLOG, 1024*10)
// 解决端口占用问题, 可以共用服务器端口(即使该端口已被其他端口占用)
.option(ChannelOption.SO_REUSEADDR, true)
// 接收消息缓冲区大小
.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10)
// 发送消息缓冲区大小
.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10)
// 用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;
// 如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送
.option(ChannelOption.TCP_NODELAY, true)
// 用于检测长时间没有数据传输的连接状态,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024 * 10))
.childHandler(new NettyServerChannelInitializer())
// 当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证全部发送成功
// 使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送
.option(ChannelOption.SO_LINGER, 2000);
log.info("netty服务器开始监听端口" );
// 绑定端口,开始接收进来的连接
ChannelFuture future = bootstrap.bind(new InetSocketAddress(host,port)).sync();
if(future.isSuccess()){
log.info("NETTY服务启动成功");
}
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 销毁
*/
@PreDestroy
public void destroy(){
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
log.info("关闭 Netty 成功");
}
}
📗 Netty服务端初始化类(NettyServerChannelInitializer)
主要设置编码格式,设置读写超时时间,加载Netty服务端处理类
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel){
channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
channel.pipeline().addLast(new IdleStateHandler(600, 600, 600, TimeUnit.SECONDS));
channel.pipeline().addLast(new NettyServerHandler());
}
}
📘 Netty服务端处理类(NettyServerHandler)
主要用来处理客户端发送过来的消息,并进行反馈。同时监听出现异常、超时情况等
@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Autowired
private ReceiveDataService receiveDataService;
public static NettyServerHandler nettyServerHandler;
@PostConstruct
public void init() {
nettyServerHandler = this;
nettyServerHandler.receiveDataService = this.receiveDataService;
}
/**
* 管理一个全局map,保存连接进服务端的通道数量
*/
private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* @param ctx
* @return void
* @description:有客户端连接服务器会触发此函数
* @author: user
* @date: 2023-11-14 11:22:40
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
try {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
int clientPort = insocket.getPort();
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
//如果map中不包含此连接,就保存连接
if (CHANNEL_MAP.containsKey(channelId)) {
log.info("NETTY-Socket-客户端【" + channelId + "】是连接状态,此时的连接通道数量: " + CHANNEL_MAP.size());
} else {
//保存连接
CHANNEL_MAP.put(channelId, ctx);
log.info("NETTY-Socket-客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]" + ",此时的连接通道数量: " + CHANNEL_MAP.size());
}
} catch (Exception e) {
log.error("channelActive error:" + e.fillInStackTrace());
}
}
/**
* @param ctx
* @return void
* @description:有客户端终止连接服务器会触发此函数
* @author: user
* @date: 2023-11-14 11:24:48
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
try {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
ChannelId channelId = ctx.channel().id();
//包含此客户端才去删除
if (CHANNEL_MAP.containsKey(channelId)) {
//删除连接
CHANNEL_MAP.remove(channelId);
log.info("NETTY客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]" + ",此时的连接通道数量: " + CHANNEL_MAP.size());
}
} catch (Exception e) {
log.error("channelInactive error:" + e.fillInStackTrace());
}
}
/**
* 有客户端发消息会触发此函数
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == null) {
throw new Exception("NETTY-Socket加载客户端报文为空!");
}
log.info("NETTY-Socket-接收客户端报文【" + ctx.channel().id() + "】" + " :" + msg);
//给客户端发送消息
this.channelWrite(ctx.channel().id(),msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
log.info("NETTY-Socket-【" + ctx.channel().id() + "】 数据接收完毕");
}
/**
* 服务端给客户端发送消息
*
* @param channelId 连接通道唯一id
* @param msg 需要发送的消息内容
* @throws Exception
*/
public void channelWrite(ChannelId channelId, Object msg) {
try {
ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
if (ctx == null) {
log.info("NETTY-Socket-通道【" + channelId + "】不存在");
return;
}
if (msg == null || msg == "") {
log.info("NETTY-Socket-服务端响应空的消息");
return;
}
//将客户端的信息直接返回写入ctx,同时进行刷新
ctx.writeAndFlush(msg);
} catch (Exception e) {
log.error("channelWrite error:" + ExceptionUtil.getMessage(e));
}
}
/**
* 事件触发(读写超时、总超时)
* @param ctx
* @param evt
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
try {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("userEventTriggered-NETTY-Client: " + socketString + " READER_IDLE 读超时");
ctx.disconnect();
} else if (event.state() == IdleState.WRITER_IDLE) {
log.info("userEventTriggered-NETTY-Client: " + socketString + " WRITER_IDLE 写超时");
ctx.disconnect();
} else if (event.state() == IdleState.ALL_IDLE) {
log.info("userEventTriggered-NETTY-Client: " + socketString + " ALL_IDLE 总超时");
ctx.disconnect();
}
}
} catch (Exception e) {
log.error("userEventTriggered error:" + ExceptionUtil.getMessage(e));
}
}
/**
* 发生异常会触发此函数
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("exceptionCaught error:" + cause.fillInStackTrace());
cause.printStackTrace();
ctx.close();
log.error("NETTY-Socket:" + ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
}
}
📚 总结
以上就是我进行Netty开发的一些实例代码,也是一次不错的提升。
如果大家有需要的话,可以进行参考。由于本人能力有限,如有不对的地方,还需指正。
🎡 展望
人生在世,短短几十年,保持积极向上的心态去对待每一天,每一天都是一个新的开始,加油!!!
人生是一场很美妙并且很奇妙的旅行,在这个过程中每前进一步都充满无尽的可能与希望。

转载自:https://juejin.cn/post/7395849559388127243