likes
comments
collection
share

Netty笔记-基于Netty UDP的消息广播与监控

作者站长头像
站长
· 阅读数 9

Netty是由JBOSS提供的一个java开源框架,它提供异步的、基于事件驱动的网络应用程序框架。Netty简化和流线化了JAVA网络应用的编程开发过程,在很多TOP级别的JAVA框架中都可以见到它的身影,其鼻祖级地位无法撼动。

本文主讲Netty UDP的消息广播与监控的代码实现, 为了让读者更易理解,本文按如下顺序阐述

  1. 前置的知识与概念:主要阐述 四层和七层协议TCP和UDP 的概念
  2. Netty中UDP广播相关接口与实现类 做一个简要的说明
  3. 实战代码部分:分为服务端与客户服,含代码实现功能的解析

前置知识

Netty官网: netty.io HTTP协议-02:四层和七层协议

什么是TCP和UDP

TCP是面向连接的传输,是指管理了两个端点之间的连接的建立,在连接的生命周期内的有序和可靠的消息传输及有序的终止。

UDP属于无连接协议,并无持久化连接的概念,每个消息(一个UDP数据包)都是一个单独的传输单元。UDP也无TCP的纠错机制,每个节点都将确认他们所接收到的包,而没有被确认的包将会被发送方重新传输。

UDP适用、优势与不足的分析

  • 有局限,但UDP高速于TCP;适用于那些能够处理或容忍消息丢失的应用程序(金融类的交易一定是不合适的)
  • 单播: 发送消息给一个由唯一地址所标识的单一的网络目的地,面向连接的协议和无连接协议都支持
  • 多播: 传输到一个预定的主机组
  • 广播: 传到网络(或子网)上所有的主机
  • 发布与订阅: 类似于syslog的应用程序将被归类为发布与订阅(一个生产者,多个接收者订阅消息)

Netty中UDP广播相关接口与实现类

  • interface AddressedEnvelope<M,A extends SocketAddress>: 定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中M是消息类型, A是地址类型
  • class DefaultAddressEnvelope<M,A extends SocketAddress> implements AddressedEnvelope<M,A: 提供了AddressedEnvelope默认实现
  • interface DatagramChannel extends Channe: 扩展了Netty的Channel抽象类以支持UDP的多播组管理
  • class NioDatagramChannel extends AbstractNioMessageChannel: 定义一个能发送或接收AddressedEnvelope消息的Channel类型
  • class DatagramPacket extends DefaultAddressEnvelope<ByteBuf,InetSocketAddress> implements ByteBufHolder
    • 扩展了DefaultAddressEnvelope以使用ByteBuf作为消息数据容器
    • DatagramPacket是一个简单的消息容器,DatagramChannel实现用它来和远程节点通信

实战代码

功能描述

  • 广播端(服务端):读取一个文件,将文件中的每一行当成一个消息广播到指定端口(注:该程序无身份认证、验证或加密,请读者自行添加)
  • 接收端(客户端):接收并处理消息

ChannelPipeline事件流

  • 本地: ChannelPipeline处理流程: LogEvent -> LogEventEncoder -> DataGramPacket
  • 广播多个远程节点:远程节点1,远程节点2,远程节点3....

服务端代码

LogEvent -- 定义消息组件

public final class LogEvent {

    public static final byte SEPARATOR=(byte)':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;

    public LogEvent(String logfile, String msg ){
      this(null,logfile,msg,-1);
    }

    public LogEvent(InetSocketAddress source, String logfile, String msg, long received) {
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }

    public InetSocketAddress getSource() {
        return source;
    }

    public String getLogfile() {
        return logfile;
    }

    public String getMsg() {
        return msg;
    }

      public long getReceivedTimestamp(){
            return received;
      }
}

LogEventEncoder - 消息封装

/**
 * LogEvent的编解码器
 * 在将logevent转为DataGramPackage之前必须先进行编码
 */
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;

    /**
     * 创建即将被发送到指定的InetSocketAddress的DatagramPacket的消息
     * @param remoteAddress
     */
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }


    @Override
    protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);

        ByteBuf buf = ctx.alloc().buffer(file.length+msg.length+1);
        buf.writeBytes(file); //将文件写入到ByteBuf中
        buf.writeByte(LogEvent.SEPARATOR); //添加一个SEPARATOR
        buf.writeBytes(msg); //将日志消息写入到ByteBuf中

        //将一个拥有数据和目的地的新DatagramPacket添加到出站消息列表中
        out.add(new DatagramPacket(buf,remoteAddress));
    }
}

LogEventBroadcaster - 启动类

public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;


    public LogEventBroadcaster(InetSocketAddress address, File file) {
        this.group = new NioEventLoopGroup();
        this.file = file;
        this.bootstrap = new Bootstrap();
        bootstrap.group(group)
            .channel(NioDatagramChannel.class)
            .option(ChannelOption.SO_BROADCAST,true) //设置SO_BROADCAST套接字选项
            .handler(new LogEventEncoder(address));
    }

    public void run() throws Exception{
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        for(;;){
            //启动主动循环
            long len = file.length();
            if (len<pointer){
                pointer = len; //将文件指针设置到该文件的最后一个字节
            }else if(len>pointer){
                RandomAccessFile raf = new RandomAccessFile(file,"r");
                raf.seek(pointer); //设置当前的文件指针,以确保没有任何旧日志被发送

                String line;
                while((line=raf.readLine())!=null){
                    ch.writeAndFlush(new LogEvent(null,file.getAbsolutePath(),line,-1));
                }
                pointer = raf.getFilePointer();
                raf.close();
            }
            try{
                Thread.sleep(1000); //1秒
            }catch (Exception e){
                //休眠1秒被中断,则退出循环,否则重新处理它
                Thread.interrupted();
                break;
            }
        }
    }


    public void stop(){
        group.shutdownGracefully();
    }

    /**
     * 第1个参数为端口
     * 第2个参数文件路径
     * @param args
     */
    public static void main(String[] args) throws Exception {
        if (args.length!=2){
            throw new IllegalArgumentException("请输入2个参数");
        }

        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0]))
                ,new File(args[1]));
        try{
            broadcaster.run();
        }finally {
            broadcaster.stop();
        }
    }

}

客户端-监控端

ClientLogEventEncoder - LogEvent的编解码器


/**
 * LogEvent的编解码器
 * 在将logevent转为DataGramPackage之前必须先进行编码
 */
public class ClientLogEventEncoder extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket packet, List<Object> out) throws Exception {
        ByteBuf data = packet.content();
        int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR);
        String filename = data.slice(0,idx).toString(CharsetUtil.UTF_8); //提取文件名
        String logMsg = data.slice(idx+1,data.readableBytes()).toString(CharsetUtil.UTF_8);
        LogEvent event = new LogEvent(packet.sender()
            ,filename,logMsg,System.currentTimeMillis()
        );
        out.add(event);
    }
}

ClientLogEventHandler - 消息处理


public class ClientLogEventHandler extends SimpleChannelInboundHandler<LogEvent> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append("[");
        builder.append(event.getSource());
        builder.append("][");
        builder.append(event.getLogfile());
        builder.append("]");
        builder.append(event.getMsg());
        System.out.println(builder.toString()); //打印logEvent的数据
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

LogEventMonitor -- 启动程序


public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;

    public LogEventMonitor(InetSocketAddress address){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST,true) //设置套接字SO_BROADCAST
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new ClientLogEventEncoder());
                        pipeline.addLast(new ClientLogEventHandler());
                    }
                })
                .localAddress(address);
    }

    public Channel bind(){
        //绑定channel
        //DatagramChannel无连接
        return bootstrap.bind().syncUninterruptibly().channel();
    }

    public void stop(){
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length!=1){
            throw  new IllegalArgumentException("Usage: LogEventMonitor");
        }

        LogEventMonitor monitor = new LogEventMonitor(
                new InetSocketAddress(Integer.parseInt(args[0]))
        );
        try{
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync(); //阻塞等待服务端监听端口关闭。
        }finally {
            monitor.stop();
        }
    }
}

该文章是笔者两年前学习Netty时写的笔记文章,但段前的基础知识是本次新增的。

[参考]

转载自:https://juejin.cn/post/7302710832306782271
评论
请登录