likes
comments
collection
share

微服务组件之RPC(中)

作者站长头像
站长
· 阅读数 9

RPC 基础功能

  • 数据传输
  • 序列化/反序列化
  • 客户端代理类
  • 请求映射分发

RPC 产品功能

除了RPC基础功能为,RPC还具备八大功能模块 Consumer:连接管理、负载均衡、请求路由、超时处理 Provider:队列/线程池、超时丢弃、优雅关闭、过载保护

Consumer 核心设计

完整的consumer功能模块图 微服务组件之RPC(中)

  • 请求路由 通过一系列规则过滤出可以选择的provider节点列表,在应用隔离,读写分离灰度发布中发挥关键作用。
    • 匹配规则
      • 规则描述
        • 待比较属性
        • 运算符
        • 属性匹配值
        • 匹配节点
      • 数据结构设计
        • 链表 以IP分流规则举例 微服务组件之RPC(中)
    • 行为
    • 链表
  • 连接管理 保持与Provider的长连接,用于传输数据和返回结果,通常情况下基于tcp连接,udp&http也可以实现。
    • 初始化时机
      • 饿汉模式-服务间连接、数据库连接
      • 懒汉模式-网关连接
    • 连接数维护
      • 服务连接池
      • 数据库连接池
    • 心跳/断线重连机制
    • client线程模型
  • 负载均衡 确保多个Provider实例节点流量均匀合理(并非绝对平均);支持节点扩容缩容与灰度发布(调整流量权重比例)
    • 轮询 —— 按顺序轮流分发请求
    • 随机 —— 随机选择分发请求
    • 取模 —— 请求数 % 实例数得到集群下标,将请求分发给指定下标的集群实例。
    • 权重 —— 根据实例的性能设置权重值进行请求分发
    • 一致性hash —— 根据hash值将 0 - 2^32次方 按照实例数划分为不同的hash范围,根据范围值分发请求
  • 超时处理 对于长时间没有响应的请求,需要作异常处理,及时释放资源。
    • 工作线程阻塞位置
      • 等待响应通知
    • 超时逻辑
      • 工作线程等待通知
      • 数据返回终止等待
      • 超时抛出异常
    • 数据结构
      • Map<key:sessionID,value:WindowData>

Provider 核心设计

队列/线程池

将不同类型的请求,放入各自的队列,每个队列分配独立的线程池,进行资源隔离. 这里的Thread Pool并非指java中的线程池对象,这里是真正的线程池。 微服务组件之RPC(中) Provider 作为服务提供方,对调用方的信息是无感知的,随业务发展可能会有N个consumer对provider进行调用,工作线程有限,因而需要设计数据结构(队列)对请求进行存放。 数据结构设计分析:

  • 请求量多大?是否需要削峰填谷缓存请求?
  • 是否存在请求顺序性要求?
  • 请求之间相互隔离?不同的请求放不同的队列,不同队列使用不同线程池,资源隔离 线程与线程之间什么样的通信方式是最安全的? 通过通信方式来共享内存,不要通过共享内存来通信。线程与线程之间通过队列方式通信是最好的。 补充知识: OS分配资源的基本单位是进程,cpu调度的基本单位是线程。

线程分配的合理数: 线程数 = CPU核数 * 2 + 2;线程过多会造成线程等待和上下文切换的资源开销;线程太少又无法充分利用cpu资源。

这里就不提供压测截图了,关于压测有兴趣可以百度自行了解,或者找测试同学沟通 QPS 队列\线程 | 2w+ | 5w+ | 8w+ | 11w+

  • 单队列多线程 1*64 | 0.00s | 0.01/0.00s | 0.05/0.00s | 0.19/0.01s
  • 多队列单线程 64*1 | 0.00s | 0.01/0.00s | 0.02/0.00 | 0.07/0.01s

在QPS 8w+ 时 单队列多线程模型的耗时明显比多队列单线程高

超时丢弃

快速失败已超时的请求,缓解队列的压力

微服务组件之RPC(中)

  1. IO线程处理反序列化后将请求放入队列(req Queue)

入队代码

publick void run(Group group, IAsyncHandler handler){
    //构造 AsyncTask 对象放入队列
    AsyncTask task = new AsyncTask(taskTimeout,handler);
    balance(group,task);
}

//异步任务构造方法
public AsyncTask(int timeout,IAsyncHandler handler){
    super();
    if(timeout < 0){
      timeout = 1000;
    }
    this.timeout = timeout;//超时时间
    this.handler = handler;
    this.addTime = System.currentTimeMillis();//入队时间
}
  1. 当线程池中工作线程从队列中取出请求进行处理时,判断当前请求是否超时。超时则丢弃,未超时则调用服务处理

出队代码

public void run() {
    while(!isStop){
        AsyncTask task  = null;
        try {
            task = taskQueue.poll(1500,TimeUnit.MILLISECONDS);//取出队列任务
            if(null != task){
                execTimeoutTask(task);//判断任务是否超时
            }
        }catch (InterruptedException e){
            logger.error("has error!",e);
        }catch (Throwable ex){
            if(task !=null ){
                task.getHandler().exceptionCaught(ex);
            }
        }
    }
}

public void execTimeoutTask(AsyncTask task) throw {
    //当前时间减去入队时间是否大于超时时间 超时回调超时
    if(System.currentTimeMillis() - task.getAddTime() > task.getTimeout()){
        task.getHandler().exceptionCaught(new TimeoutException(threadFactoryName + "async task timeout!"));
        return;
    } else {
       //没有超时回调处理任务
      Object obj = task.getHandler().run();
      task.getHandler().messageReceived();
    }
}

对于consumer 而言,一个请求调用的超时时间 = 请求队列排队时间(主要耗时) + 程序处理时间

优雅关闭

进程结束前应确保队列中的的请求全部处理完成,这路的处理完成是直接回复 并非处理逻辑。 如何通知调用方?

  • 返回数据中带关闭信息 建议
  • 专门关闭协议通知调用方 不建议 服务提供方对调用方是无感知的;即便是在知道调用方的情况下这样的设计也会造成循环依赖,且调用可能失败,系统复杂度过高。 返回数据中带关闭信息可以复用现有RPC通道

优雅关闭Server端实现

  • 监听关闭信号 kill -12
@Component
 public class SignalConfig {
     @Autowired
     private SignalRegistry signalRegistry
     public void init() { signalRegistry.register(); }
 }
 //监听关闭信号
 public void register() {
     try {
         if(StringUtils.isNotBlank(osName) && (!isMac() && !isWindows())) {
             Signal sig = new Signal("USR2");//这里的USR2 其实就是 -12 代表的是用户自定义信号
             //这表示当用户通过 kill -12 来 结束某个进程时 回调  operateSignalHandler 函数进行处理
             Signal.handle(sig, operateSignalHandler);
             
             Signal sig2 = new Signal("STKFLT");
             Signal.handle(sig2, breakHeartbeatSignal);
         }
     } catch ( Exception e ) {
         logger.error("------------ signal register failed ! --------------",e)
     }
 }
  • 改变服务状态
 //改变服务状态
 @Component
 public class OperateSignal implements SignalHandler {
     private static Logger logger = LoggerFactory.getLogger(OperateSignal.class);
     @Autowired
     private RpcContext rpcContetx;
     @Override
     public void handle(Singal singalName){
         logger.info("server : {} current state is : {}",new Object[] {rpcContext.getServiceName(), rpcContext.getServerState()});
         //设置当前服务状态为重启
         rpcContext.setServerState(ServerStateType.Reboot);
         logger.info("server : {} will reboot !", new Object[]{rpcContext.getServiceName()});
     }
 }
  • 通知客户端
//超时处理判断请求没有超时时调用 service方法处理请求
public Object innerInvoke(RpcContext rpcContext, MethodSignature methodSignature) throw Exception {
    //这里使用责任链模式维护了RPC上下文对象
    requestFilter(context);
    Object response = null;
    RPCContext.setThreadLocal(context);
    ..... 省略.....
    context.getResponseProtocol().setSdpEntity(response);
    responseFilter(context);
    RpcContext.clear();
    return context;
}

protected void requestFilter(RpcContext context) throws Excpetion {
 logger.debug("begin requestFilters");
 for(IFilter filter : requestFilters){
     if(context.getExecFilter() == ExecFilterType.ALL ||
        context.getExecFilter() == ExecFilterType.RequestOnly){
        filter.filter(context);
     }
     logger.debug("end requestFilters");
 }
}

public void filter(RpcContext context) throws Exception {
    if(serviceContext.getServerState) == ServerStateType.Reboot &&
    (protocol.getPlatformType() == PlatformType.Java||protocol.getPlatformType() == PlatformType.PHP)) {
        //封装RPC响应
        RpcResponse response  = new RpcResponse();
        ResetProtocol rp = new ResetProtocol();
        rp.setMsg("This server is reboot!");
        responseProtocol.setSdpEntity(rp);
        response.setResponseBuffer(responseProtocol.toBytes(false,null));
        context.setRpcResponse(response);
        context.setExecFilter(ExecFilterType.None);
        //不再继续过滤
        context.setDoInvoke(false);
    }
}

程序启动后初始化,初始化时 初始化容器、初始化插件、初始化监听信号(重要)、初始化服务 解释: 通常结束进程我们用 kill -9 命令 这里的 9 其实就是一种信号量,用户可以自定义信号量发送给操作系统,可以自定义实现

优雅关闭client端实现

  • 根据返回改变节点状态
public Protocol request(Protocol requestProtocol) throws Exception {
    //判断服务节点状态
    if(ServerState.Reboot == state || ServerState.Dead == state) {
        throw new RebootException();
    };
    ....
    //注册窗口事件
    socket.registerRec(requestProtocol.getSessionID());
    //异步发送请求
    socket.send(data);
    ....
    
    //接收数据,等待数据到达事件
    byte[] buffer = socket.receive(requestProtocol.getSessionID().currUserCount);
    Protocol receiveProtocol = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey());
    
    //接收回包首先判断类型是否是重启
    SDPType sdpType = receiveProtocol.getSDPType();
    if(sdpType == SDPType.Reset){
        //如果是重启 服务节点置为不可用
        this.asReboot();
        logger.info("server: [{}], address : [{}] was rebooted, will choose another one !",
        new Object[]{this.getName(),this.getAddress()});
        //抛出服务重启异常
        throw new RebootException();
    }
}

设置服务为重启状态

public void asReboot() {
    if(ServerState.Reboot != this.getState()) {
        this.setState(ServerState.Reboot);
        this.setDeadTime(System.currentTimeMillis());
        this.setWeight(-1);
        this.getSocketPool().destroy();
        ServerStateDetector.instance().add(this);
        logger.debug("this server is reboot! host: "+ this.getAddress());
    }
}
  • 节点探活

过载保护

服务提供方为保证正常运行,主动丢弃超出处理能力的请求。 这里的主动丢弃与超时丢弃不同。每个请求入队时判断队列中的请求数是否达到阈值,超过直接丢弃。

入队任务代码

publick void run(Group group, IAsyncHandler handler){
    //构造 AsyncTask 对象放入队列
    AsyncTask task = new AsyncTask(taskTimeout,handler);
    balance(group,task);
}
public void balance(Group group,AsyncTask task) {
    if(group == Group.HIGH) {
        balanceTask(highFactor.getAndIncrement(), groupHighWorkers, task);
    }else if(group == Group.DEFAULT){
        balanceTask(defaultFactor.getAndIncrement(), defaultWorkers, task);
    }
}
public void balanceTask(int factor, AsyncWorker[] workers, AsyncTask task) {
    int idx = factor % workers.length;
    //如果预设值大于0
    if(limitSize>0){
        workers[idx].addTask(task,limitSize,mode);
    }else{
        worker[idx].addTask(task);
    }
}
void addTask(AsyncTask task,int limitSize,boolean abortNewTask) {
    //如果请求队列长度超出了预设值  抛弃新来的任务
    if(this.taskQueue.size() >= limitSize){
        if(abortNewTask) {
            task.getHandler().exceptionCaught(new TimeoutException(threadFactoryName + 
            "abort this task, because the queue is full!"));
        }else{
            elimintateOldTask(task);
        }
    }else{
        this.queue.offer(task);
    }
}