likes
comments
collection
share

Netty实现RPC服务器之自定义协议本篇主要介绍通过构建自定义协议,让我们的消息遵循我们自定义的协议在服务端、客户端之

作者站长头像
站长
· 阅读数 21

Netty实现RPC服务器之自定义协议

前言

本篇是用Netty实现一个RPC中间件的第二篇,未来还会持续更新,代码在最后一片更新完毕之后将会上传到github、gitee代码托管平台,供大家拉取学习,并进行讨论分享,让该RPC框架持续精进。

TCP/IP 中消息传输基于流的方式,没有边界。如果不约定或者划定边界,就会造成消息黏包或者半包现象的发生,因此我们需要指定协议。协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则。

本篇主要介绍通过构建自定义协议,让我们的消息遵循我们自定义的协议在服务端、客户端之间进行流转,类似于TCP或者IP的协议在运输层、网络层进行装包和拆包一样,从而使我们的消息适应于业务系统的需要,也更好的为业务系统进行赋能。

还是那句话:

愿我们在技术的浩瀚中游刃有余。

软件中目录

在项目中的目录如下所示:

Netty实现RPC服务器之自定义协议本篇主要介绍通过构建自定义协议,让我们的消息遵循我们自定义的协议在服务端、客户端之

rpc_core模块下的protocol包下,放置我们所有的协议类。

自定义协议简介

一般自定义协议包含以下几个要素:

  • 魔数:用来在第一时间判定是否是无效数据包,比如Java的class文件,开头(CAFEBABE)几个字节就是表示Java魔数的;

  • 版本号:可以支持协议的升级;

  • 序列化算法:消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk;

  • 指令类型:比如是登录、注册、单聊、群聊... 跟业务相关;

  • 请求序号:为了双工通信,提供异步能力;

  • 正文长度

  • 消息正文

实际代码

自定义消息类

MessageCodec.java


/**
 * 消息编解码器
 * @author XiaoSheng
 * @date 2024/8/21 下午12:19
 */
@Slf4j
@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(1);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }

}

这里需要注意的是:

Netty实现RPC服务器之自定义协议本篇主要介绍通过构建自定义协议,让我们的消息遵循我们自定义的协议在服务端、客户端之

ByteToMessageCodec的子类不能添加``@ChannelHandler.Sharable`注解,因为在编解码过程中存在中间状态,可能造成消息编解码出错...

编解码器,何时添加``@ChannelHandler.Sharable`被共享?

  1. 当 handler 不保存状态时,就可以安全地在多线程下被共享

  2. 但要注意对于编解码器类,不能继承 ByteToMessageCodecCombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制

  3. 如果能确保编解码器不会保存状态,可以继承``MessageToMessageCodec` 父类

下面这个MessageCodecSharable可以添加``@ChannelHandler.Sharable`的原因是:

在client端和server端配合了``LengthFieldBasedFrameDecoder`

到达这个编解码的消息已经经过了``LengthFieldBasedFrameDecoder`

完全转化成为我们需要的消息,不存在中间状态...

必须配合LengthFieldBasedFrameDecoder使用

/**
 *
 * 定长解码器 必须配合MessageCodecSharable使用
 * @author XiaoSheng
 * @date 2024/8/21 下午12:29
 */
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder  {

    public ProcotolFrameDecoder() {
        this(1024, 12, 4, 0, 0);
    }

    public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }

}

Netty实现RPC服务器之自定义协议本篇主要介绍通过构建自定义协议,让我们的消息遵循我们自定义的协议在服务端、客户端之

再次观察我们的消息编解码类:

MessageCodecSharable.java


@Slf4j
@ChannelHandler.Sharable
/**
 * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本,
        out.writeByte(1);
        // 3. 1 字节的序列化方式 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
        outList.add(out);
    }
 
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("magicNum={}, version={}, serializerType={}, messageType={}, sequenceId={}, length={}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("message={}", message);
        out.add(message);
    }
}

序列化对象

Serializer.java

/**
 * 扩展序列化、反序列化算法
 * @author XiaoSheng
 * @date 2024/8/21 下午12:10
 */
public interface Serializer {

    // 反序列化方法
    <T> T deserialize(Class<T> clazz, byte[] bytes);

    // 序列化方法
    <T> byte[] serialize(T object);

    enum Algorithm implements Serializer {

        Java {
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                try {
                    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    return (T) ois.readObject();
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化失败", e);
                }
            }

            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(bos);
                    oos.writeObject(object);
                    return bos.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化失败", e);
                }
            }
        },

        Json {
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();
                String json = new String(bytes, StandardCharsets.UTF_8);
                return gson.fromJson(json, clazz);
            }

            @Override
            public <T> byte[] serialize(T object) {
                Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();
                String json = gson.toJson(object);
                return json.getBytes(StandardCharsets.UTF_8);
            }
        }
    }
    class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {

        @Override
        public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
            try {
                String str = json.getAsString();
                return Class.forName(str);
            } catch (ClassNotFoundException e) {
                throw new JsonParseException(e);
            }
        }

        @Override             //   String.class
        public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
            // class -> json
            return new JsonPrimitive(src.getName());
        }
    }

}

序号生成器

序号生成器需要保证线程安全,这里采用JUC中的原子类AtomicInteger进行序号自增式的生成。

SequenceIdGenerator.java

/**
 * @author XiaoSheng
 * @date 2024/8/21 下午12:16
 */
public class SequenceIdGenerator {
    private static final AtomicInteger id = new AtomicInteger();

    public static int nextId() {
        return id.incrementAndGet();
    }

}

编解码协议解释

魔数(4字节)+版本号(1字节)+序列化方式(1字节)+ 指令类型(1字节)+请求序号(4字节)+消息长度(4字节)=15字节;

一般设计成2的整数倍。2^4=16字节,所以中间填充一个字节

测试代码

import com.netty.project.protocol.MessageCodec;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
 
public class MessageCodecTest {
    public static void main(String[] args) throws Exception{
        EmbeddedChannel channel = new EmbeddedChannel(
                new LoggingHandler(),
                new LengthFieldBasedFrameDecoder(
                        1024, 12, 4, 0, 0),
                new MessageCodec()
        );
// encode
        LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
        channel.writeOutbound(message);
    }
}

Netty实现RPC服务器之自定义协议本篇主要介绍通过构建自定义协议,让我们的消息遵循我们自定义的协议在服务端、客户端之

01 02 03 04:魔数

01 :版本号

00 :序列化方式

00 :指令类型

00 00 00 00:请求序号

ff :对齐填充

00 00 00 ef:消息正文长度(e=14,f=15)14*16+15=239直接+协议上面16=255

最后

上面就是关于Netty实现RPC中关于自定义协议部分的内容讲解,中间也介绍了自定义协议的定义规则,需要的序列化对象,序号生成器等工具类,并提供测试类。**后续的全部源码将在本专栏中的最后一期文章当中。**有兴趣的同学可以对本专栏进行订阅。

转载自:https://juejin.cn/post/7408747421281304612
评论
请登录