likes
comments
collection
share

微服务组件之RPC(上)

作者站长头像
站长
· 阅读数 68

微服务架构

优先级 red --> yellow --> blue --> green

微服务组件之RPC(上)

前言

作为基础架构部的职责就是专门产出组件供业务部使用,传统的单体架构其主要组成部分由 网关、业务逻辑层、数据访问层,这三层架构都在同一个进程之间。随着业务的发展,又划分为垂直分层、水平分层等概念;例如:将网关、业务逻辑、数据访问分别单独作为一个进程独立运行。由于进程与进程之间交互的原因,引出了 RPC(远程过程调用)的概念和技术实现。 业务部门开发人员主要工作是负责将产品、运营部门提出来的需求通过代码做技术实现。 基础架构部门人员主要负责组件的 自研、选用、调研工作,根据公司的背景及需求做出决策。

RPC

RPC (远程过程调用)实现了不同的进程之间相互调用,RPC调用发生在进程与进程之间;RPC 拓展了算力(计算机计算能力)。RPC是进程间通信的一种手段,实现通信还可以通过 MQ ,MQ(异步、解耦、削峰) 和 RPC 是互联网中的两种最主要的通信方式。RPC、MQ、服务治理称为互联网三大标配。

  • RPC的概念
    • Remote Procedure Call 远程过程调用
    • 基于网络表达语义和传达数据
    • (会话层)通信协议
    • 像调用本地方法一样调用远程服务
    • 扩展了算力
    • 服务治理的基础
  • RPC的作用
    • 屏蔽组包/解包
    • 屏蔽数据发送/接收
    • 提高开发效率
    • 业务发展的必然产物
  • RPC的核心组成
    • 远程方法对象代理
    • 连接管理
    • 序列化/反序列化
    • 寻址与负载均衡
  • RPC的调用方式
    • 同步调用
    • 异步调用
  • RPC的调用过程

微服务组件之RPC(上)

RPC的自定义实现

  • Client 工作内容
    • 建立与Server的连接
    • 封装请求数据
    • 发送数据包给Server
    • 接收Server处理结果数据包
    • 解析处理结果数据包内容
  • Server 工作内容
    • 监听端口
    • 响应client连接请求
    • 接收client请求数据包
    • 解析client请求数据包调用本地方法
    • 封装调用方法处理结果数据包
    • 响应处理结果数据包给client
  • 序列化协议
    • 远程调用涉及数据传输,就会产生数据组包和解包,需要调用方与服务方约定数据格式,约定即协议
    微服务组件之RPC(上)
    • 请求响应交互示例
    微服务组件之RPC(上)
    • 自定义协议类
    public class RpcProtocol implements Serializable{
        public static int CMD_CREATE_USER = 1;
        private int version; // 1
        private int cmd; // 0
        private int magicNum; // 0x20220824
        private int bodyLen = 0; //12
        private byte[] body;
        final public static int HEAD_LEN = 16;
    }
    
    微服务组件之RPC(上)

Cosumer代码实现

  • Client连接
    public class RpcClient {
        public static void main(String[] args) throws Exception {
            UserService proxyUserService = new UserService();//创建代理类
    
            User user = new User();//构造请求数据
            user.setAge((short) 26);
            user.setSex((short) 1);
    
    
            int ret = proxyUserService.addUser(user);//执行远程调用
            if(ret == 0)
                System.out.println("调用远程服务创建用户成功!!!");
            else
                System.out.println("调用远程服务创建用户失败!!!");
        }
    }
    
  • 构造请求数据
    public int addUser (User userinfo) throws Exception {
        //初始化客户端连接
        TcpClient client = TcpClient.GetInstance();//单例
        try {
            client.init();
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("init rpc client error");
        }
    
        //构造请求数据
        RpcProtocol rpcReq = new RpcProtocol();
        rpcReq.setCmd(RpcProtocol.CMD_CREATE_USER);
        rpcReq.setVersion(0x01);
        rpcReq.setMagicNum(0x20110711);
        byte[] body = rpcReq.userInfoTobyteArray(userinfo);
        rpcReq.setBodyLen(body.length);
        rpcReq.setBody(body);
    
        //序列化
        byte[] reqData = rpcReq.generateByteArray();
    
        //发送请求
        client.sendData(reqData);
    
        //接收请求结果
        byte[] recvData = client.recvData();
    
        //反序列化结果
        RpcProtocol rpcResp = new RpcProtocol();
        rpcResp.byteArrayToRpcHeader(recvData);
        int ret = ByteConverter.bytesToInt(rpcResp.getBody(), 0);
        return ret;
    }
    
  • 序列化工具类
        public class ByteConverter {
            //序列化
            public static byte[] intToBytes(int n) {
                byte[] buf = new byte[4]; //序列化的本质就是将其转换为字节数组
                for (int i = 0; i < buf.length; i++) {
                    buf[i] = (byte) (n >> (8 * i));
                }
                return buf;
            }
            //反序列化
            public static int bytesToInt(byte[] buf, int offset) {
                return buf[offset] & 0xff
                        | ((buf[offset + 1] << 8) & 0xff00)
                        | ((buf[offset + 2] << 16) & 0xff0000)
                        | ((buf[offset + 3] << 24) & 0xff000000);
            }
        }
    

Provider代码实现

  • 启动服务 监听端口
    public static void main(String[] args) throws Exception {
          Thread tcpServerThread = new Thread("tcpServer") {
              public void  run() {
                  TcpServer tcpServer = new TcpServer(SERVER_LISTEN_PORT);
                  try {
                      tcpServer.start();//启动服务 监听端口
                  } catch (Exception e) {
                      logger.info("TcpServer start exception: " + e.getMessage());
                  }
              }
          };
          tcpServerThread.start();
          tcpServerThread.join();
      }
    
  • 使用netty
    public void start() throws Exception {
          try {
              ServerBootstrap serverBootstrap = new ServerBootstrap();
              serverBootstrap.group(bossGroup, workerGroup);//设置工作线程和接收线程
              serverBootstrap.channel(NioServerSocketChannel.class);
              serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); //连接数
              serverBootstrap.localAddress(this.port);//设置端口
              serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
              serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                  @Override
                  protected void initChannel(SocketChannel socketChannel) throws Exception {
                      ChannelPipeline pipeline = socketChannel.pipeline();
                      pipeline.addLast(new PkgDecoder());//设置解码器
                      pipeline.addLast(new ServerHandler());//绑定处理类
                  }
              });
    
              ChannelFuture channelFuture = serverBootstrap.bind().sync();
              if (channelFuture.isSuccess()) {
                  logger.info("rpc server start success!");
              } else {
                  logger.info("rpc server start fail!");
              }
              channelFuture.channel().closeFuture().sync();
          } catch (Exception ex) {
              logger.error("exception occurred exception=" + ex.getMessage());
          } finally {
              bossGroup.shutdownGracefully().sync();          // 释放线程池资源
              workerGroup.shutdownGracefully().sync();
          }
      }
    
  • 解码器
    public class PkgDecoder extends ByteToMessageDecoder{
         private Logger logger = LoggerFactory.getLogger(PkgDecoder.class);
    
         public PkgDecoder(){ }
    
         @Override
         protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out)  throws Exception{
             if (buffer.readableBytes() < RpcProtocol.HEAD_LEN) {
                 return; //未读完足够的字节流,缓存后继续读
             }
    
             byte[] intBuf = new byte[4];
             // ImHeader的bodyLen在第68位到71为, int类型
             buffer.getBytes(buffer.readerIndex() + RpcProtocol.HEAD_LEN - 4, intBuf);    
             int bodyLen = ByteConverter.bytesToIntBigEndian(intBuf);
    
             if (buffer.readableBytes() < RpcProtocol.HEAD_LEN + bodyLen) {
                 return; //未读完足够的字节流,缓存后继续读
             }
    
             byte[] bytesReady = new byte[RpcProtocol.HEAD_LEN + bodyLen];
             buffer.readBytes(bytesReady);
             out.add(bytesReady);
         }
     }
    
  • 请求处理
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         byte[] recvData = (byte[]) msg;
         if (recvData.length == 0) {
             logger.warn("receive request from client, but the data length is 0");
             return;
         }
    
         logger.info("receive request from client, the data length is: " + recvData.length);
    
         //反序列化请求数据
         RpcProtocol rpcReq = new RpcProtocol();
         rpcReq.byteArrayToRpcHeader(recvData);
    
         if(rpcReq.getMagicNum() != RpcProtocol.CONST_CMD_MAGIC){
             logger.warn("request msgic code error");
             return;
         }
    
         //解析请求,并调用处理方法
         int ret = -1;
         if(rpcReq.getCmd() == CMD_CREATE_USER){
             User user = rpcReq.byteArrayToUserInfo(rpcReq.getBody());
             UserService userService = new UserService();
             ret = userService.addUser(user);
    
    
             //构造返回数据
             RpcProtocol rpcResp = new RpcProtocol();
             rpcResp.setCmd(rpcReq.getCmd());
             rpcResp.setVersion(rpcReq.getVersion());
             rpcResp.setMagicNum(rpcReq.getMagicNum());
             rpcResp.setBodyLen(Integer.BYTES);
             byte[] body = rpcResp.createUserRespTobyteArray(ret);
             rpcResp.setBody(body);
             ByteBuf respData = Unpooled.copiedBuffer(rpcResp.generateByteArray());
             ctx.channel().writeAndFlush(respData);
         }
     }
    
转载自:https://juejin.cn/post/7135366256042967048
评论
请登录