likes
comments
collection
share

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

作者站长头像
站长
· 阅读数 10

前言

本篇博文是《从0到1学习 Netty》中进阶系列的第三篇博文,主要内容是从 Redis、HTTP 和自定义协议三个方面来探讨了 Netty 通信协议的设计,结合应用案例加深理解,根据实际情况优化协议,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

介绍

当今互联网软件系统中,常常需要使用多种协议进行通信。例如,在一个分布式存储系统中,可能需要同时支持 Redis 协议、HTTP 协议以及自定义协议等。而对于这些不同的协议,如何实现跨协议通信也成为了亟待解决的问题。

Netty 作为一种高性能的网络通信框架,在处理不同协议的通信方面具有很大优势。在使用 Netty 实现跨协议通信之前,我们首先需要进行协议特征的分析和比较。我们以 Redis 协议、HTTP 协议和自定义协议为例:

  1. Redis 协议:Redis 协议是一种基于 TCP 连接的二进制协议,用于与 Redis 数据库进行交互。它采用简单的请求/响应模型,并且支持异步执行命令(通过 MULTI / EXEC 命令)。
  2. HTTP 协议:HTTP 协议是一种基于 TCP 连接的文本协议,用于 Web 服务中的客户端-服务器通信。它采用请求/响应模型,并且支持状态码、HeaderCookie 等功能。
  3. 自定义协议:自定义协议是指根据业务需求自定义的协议。它可以是基于二进制格式或者文本格式,通常需要定义消息头、消息体以及校验码等字段。

通过以上分析,我们可以看出不同协议的特点和差异。在使用 Netty 实现跨协议通信时,需要根据每种协议的特点进行针对性开发,下面将介绍如何使用 Netty 实现跨协议通信。

Redis 协议实现

Redis 使用一种基于文本的协议来进行与客户端的通信,该协议被称为 RESP(REdis Serialization Protocol)。

RESP 协议定义了一组规则和格式,用于描述在 Redis 服务器和 Redis 客户端之间交换数据的方式。它支持多种数据类型,包括字符串、数字、数组和错误消息。RESP 协议采用简单而直观的格式,以提高通信效率和可读性。例如,对于字符串类型,RESP协议使用以下格式:

+OK\r\n

其中,"+" 表示状态回复,"OK" 表示字符串内容,"\r\n" 表示行结束符。这个例子中的字符串只包含一个单词,但 RESP 协议同样适用于包含多个单词或者更复杂结构的字符串。如果在此之前对 Redis 没有了解,欢迎移步博主的 Redis 专栏

例如,我们给 Redis 发送一条指令,新增一个键值对 set name sidiot,那么需要遵守以下协议:

# 由于 set name sidiot 指令一共有 3 部分,因此用 *3 表示
*3\r\n
# 第一个指令的长度是 3
$3\r\n
# 第一个指令是 set 指令
set\r\n
# 以此类推
$4\r\n name\r\n
$6\r\n sidiot\r\n

注意,每条指令之后都要添加回车与换行符 \r\n

测试代码:

@Override  
public void channelActive(ChannelHandlerContext ctx) throws Exception {  
    ByteBuf buffer = ctx.alloc().buffer();  
    String command = "*3\r\n" +  
            "$3\r\nSET\r\n" +  
            "$4\r\nname\r\n" +  
            "$6\r\nsidiot\r\n";  
    buffer.writeBytes(command.getBytes());  
    ctx.writeAndFlush(buffer);  
}

运行结果:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

再例如使用 GET 指令,获取刚刚新增的键值对:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

或者组合使用指令:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

需要完整代码的读者请访问博主的 Github:TestRedis.java

HTTP 协议实现

HTTP 是一种常用的网络协议,它定义了客户端和服务器之间如何进行通信。在 HTTP 通信中,请求行和请求头包含了大量的信息,这些信息对于处理 HTTP 请求来说非常重要。然而,手动实现 HTTP 请求的解析和编码是一项相当复杂的任务。

为了简化这个过程,可以使用 HttpServerCodec 作为服务器端的解码器与编码器,来处理 HTTP 请求,它能够将 HTTP 请求解析为可读的数据,并将响应数据编码为 HTTP 格式。

需要注意的是,如果类名上出现的是 Codec,那表示这个类既有解码又有编码。例如 HttpServerCodec 类,它就包括了 HttpRequestDecoderHttpResponseEncoder,源码如下所示:

public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>  
        implements HttpServerUpgradeHandler.SourceCodec { ... }

测试代码:

@Override  
protected void initChannel(SocketChannel ch) {  
    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));  
    ch.pipeline().addLast(new HttpServerCodec());  
    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {  
        @Override  
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
            log.debug("{}", msg.getClass());  
        }  
    });  
}

运行结果:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

同时,日志输出了两个类:DefaultHttpRequestLastHttpContent,这是因为由于 HttpServerCodec 将请求解析成了两个部分。其中,DefaultHttpRequest 包含了请求行和请求头,LastHttpContent 表示的是请求体:

17:24:49 [DEBUG] [nioEventLoopGroup-3-1] c.s.n.c4.TestHTTP - class io.netty.handler.codec.http.DefaultHttpRequest
17:24:49 [DEBUG] [nioEventLoopGroup-3-1] c.s.n.c4.TestHTTP - class io.netty.handler.codec.http.LastHttpContent$1

我们可以使用 SimpleChannelInboundHandler 来专注于某一种类型的消息,比如 HttpRequestSimpleChannelInboundHandler 是 Netty 中的一个入站处理程序,用于处理接收到的数据,它是一种特殊类型的 ChannelInboundHandler,可以自动释放消息资源,防止内存泄漏。

测试代码:

ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {  
        // 获得请求uri  
        log.debug(msg.uri());  
        // 获得完整响应,设置版本号与状态码  
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);  
        // 设置响应内容  
        byte[] bytes = "<h1>Hello, sidiot!</h1>".getBytes(StandardCharsets.UTF_8);  
        // 设置响应体  
        response.content().writeBytes(bytes);  
        // 写回响应  
        ctx.writeAndFlush(response);  
    }  
});

运行结果:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

这里浏览器一直在转动的原因是,我们并没有告诉浏览器响应数据的长度是多少,因此浏览器认为后续还会有数据到来,所以一直处于等待状态,我们仅需设置响应体长度即可,代码如下:

import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;

response.headers().setInt(CONTENT_LENGTH, bytes.length);

运行结果:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

总的来说,在服务器收到来自浏览器的 HTTP 请求后,它需要向浏览器返回一个响应。因此,使用 DefaultFullHttpResponse 类创建一个响应对象,在创建响应对象时,需要设置 HTTP 协议版本号和状态码来表示服务器处理该请求的结果。

此外,为了避免浏览器在接收到响应后一直处于等待状态,我们需要通过添加 CONTENT_LENGTH 字段来指定响应正文的长度,以便浏览器知道何时可以停止等待并开始处理响应数据。

因此,在构建 HTTP 响应时,必须确保包括正确的 HTTP 版本号、状态码和内容长度信息,以确保服务器和客户端之间的通信正确无误。

需要完整代码的读者请访问博主的 Github:TestHTTP.java

自定义协议实现

除了 Redis 和 HTTP 协议,我们还可以使用自定义协议来实现 Netty 应用程序之间的通信。自定义协议可以根据应用程序的特定需求进行设计,从而使得 Netty 应用程序的通信更加高效和安全。

自定义协议一般由以下要素组成:

  1. 魔数:魔数是一个特定的数字或字符串,在数据包的开头位置出现,作为识别标志。接收方可以根据魔数判断数据包是否有效,如果无效则直接丢弃,从而提高通信效率和安全性。

  2. 版本号:版本号表示自定义协议的版本信息,当协议发生变化时,可以通过版本号来区分不同的协议版本,这样旧版本的客户端和服务器也能够兼容,即使协议发生变化也能正确处理数据。

  3. 序列化算法:序列化算法指的是将消息正文转换为二进制数据的方式。因为网络传输只能传输二进制数据,所以需要将消息正文序列化为二进制数据,发送方在发送数据时需要对消息正文进行序列化,接收方在接收数据时需要对消息正文进行反序列化,才能正确地还原消息。序列化方法有 json、protobuf、hessian、jdk 等。

  4. 指令类型:指令类型表示发送方要执行的具体业务操作,例如登录、注册、单聊、群聊等,接收方可以根据指令类型来分发消息,将不同的消息转发给相应的业务处理模块。

  5. 请求序号:请求序号是发送方用来标识一个请求的唯一标识符,接收方在返回响应时会携带相同的请求序号,以便发送方能够正确地将响应和请求匹配起来。请求序号还可以用于实现异步通信,发送方可以通过请求序号来判断是否收到了对应的响应,从而实现异步能力。

  6. 正文长度:消息正文的长度,用于接收方正确地读取数据。由于网络传输中数据包大小是有限制的,因此发送方需要对消息正文的长度进行限制,同时也需要将消息正文的长度信息发送给接收方,以便接收方能够正确地读取数据。

  7. 消息正文:包含具体的业务信息。消息正文是自定义协议中最重要的部分,它包含具体的业务信息,例如用户的登录信息、聊天内容等。


接下来以聊天室为业务场景,获取相关业务消息请访问博主的 Github:Message

创建 MessageCodec 类,继承 ByteToMessageCodec 类,它实现了将字节流转换为消息对象并进行解码的功能,该类是一个抽象类,需要通过继承并实现其中的抽象方法来完成具体的解码逻辑。

在实现过程中,需要重写以下两个方法:

  • decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out):该方法表示从入站字节流 ByteBuf 中解码出消息对象,并将结果存储到出站列表 List<Object> 中。在该方法中需要完成字节流的读取和消息对象的构建工作。代码实现如下:

    @Override  
    public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { 
        // 设置魔数  
        out.writeBytes("IDIOT".getBytes());  
        // 设置版本号  
        out.writeByte(1);  
        // 设置序列化方式  
        out.writeByte(1);  
        // 设置指令类型  
        out.writeByte(msg.getMessageType());  
        // 设置请求序号  
        out.writeInt(msg.getSequenceId());  
    
        // 获得序列化后的 msg  
        ByteArrayOutputStream bos = new ByteArrayOutputStream();  
        ObjectOutputStream oos = new ObjectOutputStream(bos);  
        oos.writeObject(msg);  
        byte[] bytes = bos.toByteArray();  
    
        // 获得并设置正文长度
        out.writeInt(bytes.length);  
        // 设置消息正文  
        out.writeBytes(bytes);  
    }
    
  • encode(ChannelHandlerContext ctx, Message msg, ByteBuf out):该方法表示将消息对象编码成字节流并写入出站 ByteBuf 中。在该方法中需要根据不同的协议规范将消息对象转换为字节流,并写入 ByteBuf 中。代码实现如下:

    @Override  
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {  
        // 获取魔数  
        byte[] magic = new byte[5];  
        in.readBytes(magic, 0, 5);  
        // 获取版本号  
        byte version = in.readByte();  
        // 获得序列化方式  
        byte seqType = in.readByte();  
        // 获得指令类型  
        byte messageType = in.readByte();  
        // 获得请求序号  
        int sequenceId = in.readInt();  
        // 获得正文长度  
        int length = in.readInt();  
        // 获得正文  
        byte[] bytes = new byte[length];  
        in.readBytes(bytes, 0, length);  
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));  
        Message message = (Message) ois.readObject();  
        // 将信息放入 List 中,传递给下一个 handler  
        out.add(message);
    }
    

测试代码:

public class TestMessageCodec {  
    public static void main(String[] args) throws Exception {  
        EmbeddedChannel channel = new EmbeddedChannel(  
                new LoggingHandler(LogLevel.DEBUG),  
                new MessageCodec()  
        );  

        // encode  
        LoginRequestMessage msg = new LoginRequestMessage("sidiot", "123456");  
        channel.writeOutbound(msg);  

        // decode  
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();  
        new MessageCodec().encode(null, msg, buf);  

        // 入站  
        channel.writeInbound(buf);  
    }  
}

运行结果:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

需要完整代码的读者请访问博主的 Github:MessageCodec.java

协议设计优化

避免半包现象

如果消息发送时,出现了半包现象,系统是否能解析呢?这里使用 ByteBuf 的逻辑切片 slice 来伪造半包现象,忘记的同学可以回看博文 ByteBuf 的性能优化

修改代码如下所示:

channel.writeInbound(buf.slice(0, 121));

运行结果:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

为了避免上述情况,可以使用博主在上篇博文讲解的帧解码器 LengthFieldBasedFrameDecoder

修改代码如下所示:

EmbeddedChannel channel = new EmbeddedChannel(  
        new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),  
        new LoggingHandler(),  
        new MessageCodec()  
);

运行结果:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

当然,如果后续的包能够在连接关闭前到来,那么 Netty 将会合并这些包,整合成一个完整的包,使得系统能够解析,运行结果如下:

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

需要完整代码的读者请访问博主的 Github:TestMessageCodec.java

提高 handler 的复用率

在使用 Netty 进行网络编程时,我们经常需要使用 handler 来处理数据流,并将其传递给下一个 handler 或者业务逻辑。为提高 handler 的复用率,可以将 handler 创建为 handler 对象,并在不同的 channel 中使用该 handler 对象进行数据处理操作。

例如,我们可以使用 LoggingHandler 来记录不同 channel 中的日志信息。为了提高复用率,我们可以创建一个 LoggingHandler 对象,并将其添加到多个 channel 的 pipeline 中。这样一来,不同 channel 中产生的日志信息都会传递给同一个 LoggingHandler 对象进行处理,从而实现了代码复用和资源节约。

LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);

然而,并非所有的 handler 都能通过这种方式来提高复用率。例如,在多个 channel 中共享同一个 LengthFieldBasedFrameDecoder 对象时,可能会发生以下问题:

假设 channel1 中收到了一个半包,LengthFieldBasedFrameDecoder 发现它不是一条完整的数据,因此不会向下传播该数据。但此时,如果 channel2 中也收到了一个半包,由于两个 channel 使用了同一个 LengthFieldBasedFrameDecoder 对象,存储在其中的数据刚好组成了一条完整的数据包,LengthFieldBasedFrameDecoder 就会让该数据包继续向下传播,最终导致错误。

为了避免这种问题,Netty 中提供了 @Sharable 注解来标识一个 handler 是否可被多个 channel 共享。只有被标记为 @Sharable 的 handler 才能够安全地被多个 channel 共享,并使得复用率得以提升,同时保证了内部状态的正确性和线程安全性。因此,在使用 Netty 中的 handler 时,需要注意其是否标记了 @Sharable 注解,以确保安全高效地进行代码复用。

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

那么我们自定义的协议 MessageCodec 可以直接加上 @Sharable 注解来实现共享吗?

【Netty】「优化进阶」(三)Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起

答案是不能的。这是因为 ByteToMessageCodec 是一种处理网络数据的 handler,它将 ByteBuf 转化为特定的 Message 对象,使得数据更加易于处理和解析,但是在使用 ByteToMessageCodec 时,需要注意到传入该 handler 的 ByteBuf 可能并不是完整的数据包,而只是数据包的一部分或者多个数据包拼接而成的。

因此,如果多个 channel 共享同一个 ByteToMessageCodec 对象,则可能会引发一些并发问题。比如,一个 handler 尝试读取未完成的数据,并且在读取过程中修改了 ByteBuf 中的内容,那么其他 handler 也会受到这个修改的影响,从而导致程序出现异常或错误的行为。

不过这里可以使用 MessageToMessageDecoder 来实现共享,MessageToMessageDecoder 主要用于将已经被处理过的数据再次进行处理。因为 MessageToMessageDecoder 接收到的是已经被处理过的完整数据,所以即使被多个 channel 共享,也不会造成数据处理上的错误。代码如下所示:

@Slf4j  
@ChannelHandler.Sharable  
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override  
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ... }
    
    @Override  
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ... }
}

要注意的是,MessageToMessageCodec 必须与 LengthFieldBasedFrameDecoder 一起使用,确保接收到的 ByteBuf 的完整性。

需要完整代码的读者请访问博主的 Github:MessageCodecSharable.javaChatServer.java

后记

在本文中,我们从 Redis、HTTP 和自定义协议三个方面分析了 Netty 通信协议的设计。对于 Redis 协议,我们了解了其基于字符串的设计和多条命令组合的方式,以及如何基于 Netty 构建自己的 Redis 协议解析器。对于 HTTP 协议,我们讲解了 HTTP 协议的基本结构、状态码、请求方法和报文格式,并演示了如何使用 Netty 发送和接收 HTTP 请求和响应。最后,我们介绍了自定义协议的设计,包括协议头和协议体的格式、编解码方式等关键要素,并给出了具体的实现代码。

当然,在实际的应用场景中,通信协议的设计也需要根据具体的业务需求进行优化和调整。但是,无论采用哪种协议,都需要遵守一定的规范和标准,以确保通信的正确性和稳定性。

以上就是 Netty 通信协议设计:从 Redis、HTTP 和自定义协议看起 的所有内容了,希望本篇博文对大家有所帮助!

参考:

📝 上篇精讲:「优化进阶」(二)浅谈 LengthFieldBasedFrameDecoder:如何实现可靠的消息分割?

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多支持;

🔥 系列专栏:探索 Netty:源码解析与应用案例分享