Netty实现RPC服务器之自定义协议本篇主要介绍通过构建自定义协议,让我们的消息遵循我们自定义的协议在服务端、客户端之
Netty实现RPC服务器之自定义协议
前言
本篇是用Netty实现一个RPC中间件的第二篇,未来还会持续更新,代码在最后一片更新完毕之后将会上传到github、gitee代码托管平台,供大家拉取学习,并进行讨论分享,让该RPC框架持续精进。
TCP/IP 中消息传输基于流的方式,没有边界。如果不约定或者划定边界,就会造成消息黏包或者半包现象的发生,因此我们需要指定协议。协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则。
本篇主要介绍通过构建自定义协议,让我们的消息遵循我们自定义的协议在服务端、客户端之间进行流转,类似于TCP或者IP的协议在运输层、网络层进行装包和拆包一样,从而使我们的消息适应于业务系统的需要,也更好的为业务系统进行赋能。
还是那句话:
愿我们在技术的浩瀚中游刃有余。
软件中目录
在项目中的目录如下所示:
在rpc_core
模块下的protocol
包下,放置我们所有的协议类。
自定义协议简介
一般自定义协议包含以下几个要素:
-
魔数:用来在第一时间判定是否是无效数据包,比如Java的class文件,开头(CAFEBABE)几个字节就是表示Java魔数的;
-
版本号:可以支持协议的升级;
-
序列化算法:消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk;
-
指令类型:比如是登录、注册、单聊、群聊... 跟业务相关;
-
请求序号:为了双工通信,提供异步能力;
-
正文长度
-
消息正文
实际代码
自定义消息类
MessageCodec.java
/**
* 消息编解码器
* @author XiaoSheng
* @date 2024/8/21 下午12:19
*/
@Slf4j
@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(1);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
这里需要注意的是:
ByteToMessageCodec
的子类不能添加``@ChannelHandler.Sharable`注解,因为在编解码过程中存在中间状态,可能造成消息编解码出错...
编解码器,何时添加``@ChannelHandler.Sharable`被共享?
-
当 handler 不保存状态时,就可以安全地在多线程下被共享
-
但要注意对于编解码器类,不能继承
ByteToMessageCodec
或CombinedChannelDuplexHandler
父类,他们的构造方法对@Sharable
有限制 -
如果能确保编解码器不会保存状态,可以继承``MessageToMessageCodec` 父类
下面这个MessageCodecSharable可以添加``@ChannelHandler.Sharable`的原因是:
在client端和server端配合了``LengthFieldBasedFrameDecoder`
到达这个编解码的消息已经经过了``LengthFieldBasedFrameDecoder`
完全转化成为我们需要的消息,不存在中间状态...
必须配合LengthFieldBasedFrameDecoder
使用
/**
*
* 定长解码器 必须配合MessageCodecSharable使用
* @author XiaoSheng
* @date 2024/8/21 下午12:29
*/
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
再次观察我们的消息编解码类:
MessageCodecSharable.java
@Slf4j
@ChannelHandler.Sharable
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("magicNum={}, version={}, serializerType={}, messageType={}, sequenceId={}, length={}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("message={}", message);
out.add(message);
}
}
序列化对象
Serializer.java
/**
* 扩展序列化、反序列化算法
* @author XiaoSheng
* @date 2024/8/21 下午12:10
*/
public interface Serializer {
// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);
// 序列化方法
<T> byte[] serialize(T object);
enum Algorithm implements Serializer {
Java {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
},
Json {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();
String json = new String(bytes, StandardCharsets.UTF_8);
return gson.fromJson(json, clazz);
}
@Override
public <T> byte[] serialize(T object) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();
String json = gson.toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
}
class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
@Override
public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
try {
String str = json.getAsString();
return Class.forName(str);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override // String.class
public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
// class -> json
return new JsonPrimitive(src.getName());
}
}
}
序号生成器
序号生成器需要保证线程安全,这里采用JUC中的原子类AtomicInteger
进行序号自增式的生成。
SequenceIdGenerator.java
/**
* @author XiaoSheng
* @date 2024/8/21 下午12:16
*/
public class SequenceIdGenerator {
private static final AtomicInteger id = new AtomicInteger();
public static int nextId() {
return id.incrementAndGet();
}
}
编解码协议解释
魔数(4字节)+版本号(1字节)+序列化方式(1字节)+ 指令类型(1字节)+请求序号(4字节)+消息长度(4字节)=15字节;
一般设计成2的整数倍。2^4=16字节,所以中间填充一个字节
测试代码
import com.netty.project.protocol.MessageCodec;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
public class MessageCodecTest {
public static void main(String[] args) throws Exception{
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
new LengthFieldBasedFrameDecoder(
1024, 12, 4, 0, 0),
new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
channel.writeOutbound(message);
}
}
01 02 03 04:魔数
01 :版本号
00 :序列化方式
00 :指令类型
00 00 00 00:请求序号
ff :对齐填充
00 00 00 ef:消息正文长度(e=14,f=15)14*16+15=239直接+协议上面16=255
最后
上面就是关于Netty实现RPC中关于自定义协议部分的内容讲解,中间也介绍了自定义协议的定义规则,需要的序列化对象,序号生成器等工具类,并提供测试类。**后续的全部源码将在本专栏中的最后一期文章当中。**有兴趣的同学可以对本专栏进行订阅。
转载自:https://juejin.cn/post/7408747421281304612