微服务组件之RPC(上)
微服务架构
优先级 red --> yellow --> blue --> green
前言
作为基础架构部的职责就是专门产出组件供业务部使用,传统的单体架构其主要组成部分由 网关、业务逻辑层、数据访问层,这三层架构都在同一个进程之间。随着业务的发展,又划分为垂直分层、水平分层等概念;例如:将网关、业务逻辑、数据访问分别单独作为一个进程独立运行。由于进程与进程之间交互的原因,引出了 RPC(远程过程调用)的概念和技术实现。 业务部门开发人员主要工作是负责将产品、运营部门提出来的需求通过代码做技术实现。 基础架构部门人员主要负责组件的 自研、选用、调研工作,根据公司的背景及需求做出决策。
RPC
RPC (远程过程调用)实现了不同的进程之间相互调用,RPC调用发生在进程与进程之间;RPC 拓展了算力(计算机计算能力)。RPC是进程间通信的一种手段,实现通信还可以通过 MQ ,MQ(异步、解耦、削峰) 和 RPC 是互联网中的两种最主要的通信方式。RPC、MQ、服务治理称为互联网三大标配。
- RPC的概念
- Remote Procedure Call 远程过程调用
- 基于网络表达语义和传达数据
- (会话层)通信协议
- 像调用本地方法一样调用远程服务
- 扩展了算力
- 服务治理的基础
- RPC的作用
- 屏蔽组包/解包
- 屏蔽数据发送/接收
- 提高开发效率
- 业务发展的必然产物
- RPC的核心组成
- 远程方法对象代理
- 连接管理
- 序列化/反序列化
- 寻址与负载均衡
- RPC的调用方式
- 同步调用
- 异步调用
- RPC的调用过程
RPC的自定义实现
- Client 工作内容
- 建立与Server的连接
- 封装请求数据
- 发送数据包给Server
- 接收Server处理结果数据包
- 解析处理结果数据包内容
- Server 工作内容
- 监听端口
- 响应client连接请求
- 接收client请求数据包
- 解析client请求数据包调用本地方法
- 封装调用方法处理结果数据包
- 响应处理结果数据包给client
- 序列化协议
- 远程调用涉及数据传输,就会产生数据组包和解包,需要调用方与服务方约定数据格式,约定即协议
- 请求响应交互示例
- 自定义协议类
public class RpcProtocol implements Serializable{ public static int CMD_CREATE_USER = 1; private int version; // 1 private int cmd; // 0 private int magicNum; // 0x20220824 private int bodyLen = 0; //12 private byte[] body; final public static int HEAD_LEN = 16; }
Cosumer代码实现
- Client连接
public class RpcClient { public static void main(String[] args) throws Exception { UserService proxyUserService = new UserService();//创建代理类 User user = new User();//构造请求数据 user.setAge((short) 26); user.setSex((short) 1); int ret = proxyUserService.addUser(user);//执行远程调用 if(ret == 0) System.out.println("调用远程服务创建用户成功!!!"); else System.out.println("调用远程服务创建用户失败!!!"); } }
- 构造请求数据
public int addUser (User userinfo) throws Exception { //初始化客户端连接 TcpClient client = TcpClient.GetInstance();//单例 try { client.init(); } catch (Exception e) { e.printStackTrace(); logger.error("init rpc client error"); } //构造请求数据 RpcProtocol rpcReq = new RpcProtocol(); rpcReq.setCmd(RpcProtocol.CMD_CREATE_USER); rpcReq.setVersion(0x01); rpcReq.setMagicNum(0x20110711); byte[] body = rpcReq.userInfoTobyteArray(userinfo); rpcReq.setBodyLen(body.length); rpcReq.setBody(body); //序列化 byte[] reqData = rpcReq.generateByteArray(); //发送请求 client.sendData(reqData); //接收请求结果 byte[] recvData = client.recvData(); //反序列化结果 RpcProtocol rpcResp = new RpcProtocol(); rpcResp.byteArrayToRpcHeader(recvData); int ret = ByteConverter.bytesToInt(rpcResp.getBody(), 0); return ret; }
- 序列化工具类
public class ByteConverter { //序列化 public static byte[] intToBytes(int n) { byte[] buf = new byte[4]; //序列化的本质就是将其转换为字节数组 for (int i = 0; i < buf.length; i++) { buf[i] = (byte) (n >> (8 * i)); } return buf; } //反序列化 public static int bytesToInt(byte[] buf, int offset) { return buf[offset] & 0xff | ((buf[offset + 1] << 8) & 0xff00) | ((buf[offset + 2] << 16) & 0xff0000) | ((buf[offset + 3] << 24) & 0xff000000); } }
Provider代码实现
- 启动服务 监听端口
public static void main(String[] args) throws Exception { Thread tcpServerThread = new Thread("tcpServer") { public void run() { TcpServer tcpServer = new TcpServer(SERVER_LISTEN_PORT); try { tcpServer.start();//启动服务 监听端口 } catch (Exception e) { logger.info("TcpServer start exception: " + e.getMessage()); } } }; tcpServerThread.start(); tcpServerThread.join(); }
- 使用netty
public void start() throws Exception { try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup);//设置工作线程和接收线程 serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); //连接数 serverBootstrap.localAddress(this.port);//设置端口 serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new PkgDecoder());//设置解码器 pipeline.addLast(new ServerHandler());//绑定处理类 } }); ChannelFuture channelFuture = serverBootstrap.bind().sync(); if (channelFuture.isSuccess()) { logger.info("rpc server start success!"); } else { logger.info("rpc server start fail!"); } channelFuture.channel().closeFuture().sync(); } catch (Exception ex) { logger.error("exception occurred exception=" + ex.getMessage()); } finally { bossGroup.shutdownGracefully().sync(); // 释放线程池资源 workerGroup.shutdownGracefully().sync(); } }
- 解码器
public class PkgDecoder extends ByteToMessageDecoder{ private Logger logger = LoggerFactory.getLogger(PkgDecoder.class); public PkgDecoder(){ } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception{ if (buffer.readableBytes() < RpcProtocol.HEAD_LEN) { return; //未读完足够的字节流,缓存后继续读 } byte[] intBuf = new byte[4]; // ImHeader的bodyLen在第68位到71为, int类型 buffer.getBytes(buffer.readerIndex() + RpcProtocol.HEAD_LEN - 4, intBuf); int bodyLen = ByteConverter.bytesToIntBigEndian(intBuf); if (buffer.readableBytes() < RpcProtocol.HEAD_LEN + bodyLen) { return; //未读完足够的字节流,缓存后继续读 } byte[] bytesReady = new byte[RpcProtocol.HEAD_LEN + bodyLen]; buffer.readBytes(bytesReady); out.add(bytesReady); } }
- 请求处理
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { byte[] recvData = (byte[]) msg; if (recvData.length == 0) { logger.warn("receive request from client, but the data length is 0"); return; } logger.info("receive request from client, the data length is: " + recvData.length); //反序列化请求数据 RpcProtocol rpcReq = new RpcProtocol(); rpcReq.byteArrayToRpcHeader(recvData); if(rpcReq.getMagicNum() != RpcProtocol.CONST_CMD_MAGIC){ logger.warn("request msgic code error"); return; } //解析请求,并调用处理方法 int ret = -1; if(rpcReq.getCmd() == CMD_CREATE_USER){ User user = rpcReq.byteArrayToUserInfo(rpcReq.getBody()); UserService userService = new UserService(); ret = userService.addUser(user); //构造返回数据 RpcProtocol rpcResp = new RpcProtocol(); rpcResp.setCmd(rpcReq.getCmd()); rpcResp.setVersion(rpcReq.getVersion()); rpcResp.setMagicNum(rpcReq.getMagicNum()); rpcResp.setBodyLen(Integer.BYTES); byte[] body = rpcResp.createUserRespTobyteArray(ret); rpcResp.setBody(body); ByteBuf respData = Unpooled.copiedBuffer(rpcResp.generateByteArray()); ctx.channel().writeAndFlush(respData); } }
转载自:https://juejin.cn/post/7135366256042967048