微服务组件之RPC(中)
RPC 基础功能
- 数据传输
- 序列化/反序列化
- 客户端代理类
- 请求映射分发
RPC 产品功能
除了RPC基础功能为,RPC还具备八大功能模块 Consumer:连接管理、负载均衡、请求路由、超时处理 Provider:队列/线程池、超时丢弃、优雅关闭、过载保护
Consumer 核心设计
完整的consumer功能模块图
- 请求路由
通过一系列规则过滤出可以选择的provider节点列表,在应用隔离,读写分离灰度发布中发挥关键作用。
- 匹配规则
- 规则描述
- 待比较属性
- 运算符
- 属性匹配值
- 匹配节点
- 数据结构设计
- 链表
以IP分流规则举例
- 链表
以IP分流规则举例
- 规则描述
- 行为
- 链表
- 匹配规则
- 连接管理
保持与Provider的长连接,用于传输数据和返回结果,通常情况下基于tcp连接,udp&http也可以实现。
- 初始化时机
- 饿汉模式-服务间连接、数据库连接
- 懒汉模式-网关连接
- 连接数维护
- 服务连接池
- 数据库连接池
- 心跳/断线重连机制
- client线程模型
- 初始化时机
- 负载均衡
确保多个Provider实例节点流量均匀合理(并非绝对平均);支持节点扩容缩容与灰度发布(调整流量权重比例)
- 轮询 —— 按顺序轮流分发请求
- 随机 —— 随机选择分发请求
- 取模 —— 请求数 % 实例数得到集群下标,将请求分发给指定下标的集群实例。
- 权重 —— 根据实例的性能设置权重值进行请求分发
- 一致性hash —— 根据hash值将 0 - 2^32次方 按照实例数划分为不同的hash范围,根据范围值分发请求
- 超时处理
对于长时间没有响应的请求,需要作异常处理,及时释放资源。
- 工作线程阻塞位置
- 等待响应通知
- 超时逻辑
- 工作线程等待通知
- 数据返回终止等待
- 超时抛出异常
- 数据结构
- Map<key:sessionID,value:WindowData>
- 工作线程阻塞位置
Provider 核心设计
队列/线程池
将不同类型的请求,放入各自的队列,每个队列分配独立的线程池,进行资源隔离.
这里的Thread Pool并非指java中的线程池对象,这里是真正的线程池。
Provider 作为服务提供方,对调用方的信息是无感知的,随业务发展可能会有N个consumer对provider进行调用,工作线程有限,因而需要设计数据结构(队列)对请求进行存放。
数据结构设计分析:
- 请求量多大?是否需要削峰填谷缓存请求?
- 是否存在请求顺序性要求?
- 请求之间相互隔离?不同的请求放不同的队列,不同队列使用不同线程池,资源隔离 线程与线程之间什么样的通信方式是最安全的? 通过通信方式来共享内存,不要通过共享内存来通信。线程与线程之间通过队列方式通信是最好的。 补充知识: OS分配资源的基本单位是进程,cpu调度的基本单位是线程。
线程分配的合理数: 线程数 = CPU核数 * 2 + 2;线程过多会造成线程等待和上下文切换的资源开销;线程太少又无法充分利用cpu资源。
这里就不提供压测截图了,关于压测有兴趣可以百度自行了解,或者找测试同学沟通 QPS 队列\线程 | 2w+ | 5w+ | 8w+ | 11w+
- 单队列多线程 1*64 | 0.00s | 0.01/0.00s | 0.05/0.00s | 0.19/0.01s
- 多队列单线程 64*1 | 0.00s | 0.01/0.00s | 0.02/0.00 | 0.07/0.01s
在QPS 8w+ 时 单队列多线程模型的耗时明显比多队列单线程高
超时丢弃
快速失败已超时的请求,缓解队列的压力
- IO线程处理反序列化后将请求放入队列(req Queue)
入队代码
publick void run(Group group, IAsyncHandler handler){
//构造 AsyncTask 对象放入队列
AsyncTask task = new AsyncTask(taskTimeout,handler);
balance(group,task);
}
//异步任务构造方法
public AsyncTask(int timeout,IAsyncHandler handler){
super();
if(timeout < 0){
timeout = 1000;
}
this.timeout = timeout;//超时时间
this.handler = handler;
this.addTime = System.currentTimeMillis();//入队时间
}
- 当线程池中工作线程从队列中取出请求进行处理时,判断当前请求是否超时。超时则丢弃,未超时则调用服务处理
出队代码
public void run() {
while(!isStop){
AsyncTask task = null;
try {
task = taskQueue.poll(1500,TimeUnit.MILLISECONDS);//取出队列任务
if(null != task){
execTimeoutTask(task);//判断任务是否超时
}
}catch (InterruptedException e){
logger.error("has error!",e);
}catch (Throwable ex){
if(task !=null ){
task.getHandler().exceptionCaught(ex);
}
}
}
}
public void execTimeoutTask(AsyncTask task) throw {
//当前时间减去入队时间是否大于超时时间 超时回调超时
if(System.currentTimeMillis() - task.getAddTime() > task.getTimeout()){
task.getHandler().exceptionCaught(new TimeoutException(threadFactoryName + "async task timeout!"));
return;
} else {
//没有超时回调处理任务
Object obj = task.getHandler().run();
task.getHandler().messageReceived();
}
}
对于consumer 而言,一个请求调用的超时时间 = 请求队列排队时间(主要耗时) + 程序处理时间
优雅关闭
进程结束前应确保队列中的的请求全部处理完成,这路的处理完成是直接回复 并非处理逻辑。 如何通知调用方?
- 返回数据中带关闭信息 建议
- 专门关闭协议通知调用方 不建议 服务提供方对调用方是无感知的;即便是在知道调用方的情况下这样的设计也会造成循环依赖,且调用可能失败,系统复杂度过高。 返回数据中带关闭信息可以复用现有RPC通道
优雅关闭Server端实现
- 监听关闭信号 kill -12
@Component
public class SignalConfig {
@Autowired
private SignalRegistry signalRegistry
public void init() { signalRegistry.register(); }
}
//监听关闭信号
public void register() {
try {
if(StringUtils.isNotBlank(osName) && (!isMac() && !isWindows())) {
Signal sig = new Signal("USR2");//这里的USR2 其实就是 -12 代表的是用户自定义信号
//这表示当用户通过 kill -12 来 结束某个进程时 回调 operateSignalHandler 函数进行处理
Signal.handle(sig, operateSignalHandler);
Signal sig2 = new Signal("STKFLT");
Signal.handle(sig2, breakHeartbeatSignal);
}
} catch ( Exception e ) {
logger.error("------------ signal register failed ! --------------",e)
}
}
- 改变服务状态
//改变服务状态
@Component
public class OperateSignal implements SignalHandler {
private static Logger logger = LoggerFactory.getLogger(OperateSignal.class);
@Autowired
private RpcContext rpcContetx;
@Override
public void handle(Singal singalName){
logger.info("server : {} current state is : {}",new Object[] {rpcContext.getServiceName(), rpcContext.getServerState()});
//设置当前服务状态为重启
rpcContext.setServerState(ServerStateType.Reboot);
logger.info("server : {} will reboot !", new Object[]{rpcContext.getServiceName()});
}
}
- 通知客户端
//超时处理判断请求没有超时时调用 service方法处理请求
public Object innerInvoke(RpcContext rpcContext, MethodSignature methodSignature) throw Exception {
//这里使用责任链模式维护了RPC上下文对象
requestFilter(context);
Object response = null;
RPCContext.setThreadLocal(context);
..... 省略.....
context.getResponseProtocol().setSdpEntity(response);
responseFilter(context);
RpcContext.clear();
return context;
}
protected void requestFilter(RpcContext context) throws Excpetion {
logger.debug("begin requestFilters");
for(IFilter filter : requestFilters){
if(context.getExecFilter() == ExecFilterType.ALL ||
context.getExecFilter() == ExecFilterType.RequestOnly){
filter.filter(context);
}
logger.debug("end requestFilters");
}
}
public void filter(RpcContext context) throws Exception {
if(serviceContext.getServerState) == ServerStateType.Reboot &&
(protocol.getPlatformType() == PlatformType.Java||protocol.getPlatformType() == PlatformType.PHP)) {
//封装RPC响应
RpcResponse response = new RpcResponse();
ResetProtocol rp = new ResetProtocol();
rp.setMsg("This server is reboot!");
responseProtocol.setSdpEntity(rp);
response.setResponseBuffer(responseProtocol.toBytes(false,null));
context.setRpcResponse(response);
context.setExecFilter(ExecFilterType.None);
//不再继续过滤
context.setDoInvoke(false);
}
}
程序启动后初始化,初始化时 初始化容器、初始化插件、初始化监听信号(重要)、初始化服务 解释: 通常结束进程我们用 kill -9 命令 这里的 9 其实就是一种信号量,用户可以自定义信号量发送给操作系统,可以自定义实现
优雅关闭client端实现
- 根据返回改变节点状态
public Protocol request(Protocol requestProtocol) throws Exception {
//判断服务节点状态
if(ServerState.Reboot == state || ServerState.Dead == state) {
throw new RebootException();
};
....
//注册窗口事件
socket.registerRec(requestProtocol.getSessionID());
//异步发送请求
socket.send(data);
....
//接收数据,等待数据到达事件
byte[] buffer = socket.receive(requestProtocol.getSessionID().currUserCount);
Protocol receiveProtocol = Protocol.fromBytes(buffer,socket.isRights(),socket.getDESKey());
//接收回包首先判断类型是否是重启
SDPType sdpType = receiveProtocol.getSDPType();
if(sdpType == SDPType.Reset){
//如果是重启 服务节点置为不可用
this.asReboot();
logger.info("server: [{}], address : [{}] was rebooted, will choose another one !",
new Object[]{this.getName(),this.getAddress()});
//抛出服务重启异常
throw new RebootException();
}
}
设置服务为重启状态
public void asReboot() {
if(ServerState.Reboot != this.getState()) {
this.setState(ServerState.Reboot);
this.setDeadTime(System.currentTimeMillis());
this.setWeight(-1);
this.getSocketPool().destroy();
ServerStateDetector.instance().add(this);
logger.debug("this server is reboot! host: "+ this.getAddress());
}
}
- 节点探活
过载保护
服务提供方为保证正常运行,主动丢弃超出处理能力的请求。 这里的主动丢弃与超时丢弃不同。每个请求入队时判断队列中的请求数是否达到阈值,超过直接丢弃。
入队任务代码
publick void run(Group group, IAsyncHandler handler){
//构造 AsyncTask 对象放入队列
AsyncTask task = new AsyncTask(taskTimeout,handler);
balance(group,task);
}
public void balance(Group group,AsyncTask task) {
if(group == Group.HIGH) {
balanceTask(highFactor.getAndIncrement(), groupHighWorkers, task);
}else if(group == Group.DEFAULT){
balanceTask(defaultFactor.getAndIncrement(), defaultWorkers, task);
}
}
public void balanceTask(int factor, AsyncWorker[] workers, AsyncTask task) {
int idx = factor % workers.length;
//如果预设值大于0
if(limitSize>0){
workers[idx].addTask(task,limitSize,mode);
}else{
worker[idx].addTask(task);
}
}
void addTask(AsyncTask task,int limitSize,boolean abortNewTask) {
//如果请求队列长度超出了预设值 抛弃新来的任务
if(this.taskQueue.size() >= limitSize){
if(abortNewTask) {
task.getHandler().exceptionCaught(new TimeoutException(threadFactoryName +
"abort this task, because the queue is full!"));
}else{
elimintateOldTask(task);
}
}else{
this.queue.offer(task);
}
}
转载自:https://juejin.cn/post/7145022286427324453