likes
comments
collection
share

手撸RPC框架 -SPI机制扩展随机负载均衡策略

作者站长头像
站长
· 阅读数 22

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前言

在上一章中我们已经完成了SPI注解的实现,以及核心类的实现,并完成了测试。从这章开始,我们将会使用SPI机制扩展负载均衡策略。

实现

新建 xpc-rpc-loadbalancer模块

手撸RPC框架 -SPI机制扩展随机负载均衡策略

  • 负载均衡接口类:com.xpc.rpc.loadbalancer.api.ServiceLoadBalancerApi
package com.xpc.rpc.loadbalancer.api;

import com.xpc.rpc.annotation.SPI;

import java.util.List;

@SPI
public interface ServiceLoadBalancerApi<T> {

      T select(List<T> services);
}
  • 随机负载均衡策略实现类:com.xpc.rpc.loadbalancer.random.RamdomLoadbalancer
package com.xpc.rpc.loadbalancer.random;

import com.xpc.rpc.annotation.SPIClass;
import com.xpc.rpc.loadbalancer.api.ServiceLoadBalancerApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Random;

@SPIClass
public class RamdomLoadbalancer<T> implements ServiceLoadBalancerApi<T> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RamdomLoadbalancer.class);

    @Override
    public T select(List<T> services) {
        LOGGER.info("这是随机负载均衡策略..........................");
        if(services == null || services.isEmpty())  {
            return null;
        }
        Random random = new Random();
        int index = random.nextInt(services.size());
        return services.get(index);
    }
}

然后在 resources目录下新建一个文件

手撸RPC框架 -SPI机制扩展随机负载均衡策略

文件内容如下

random=com.xpc.rpc.loadbalancer.random.RamdomLoadbalancer

修改 xpc-rpc-register-common 下的com.xpc.rpc.register.common.config.RegisterConfig,新增一个配置项

/**
 * 负载均衡类型
 */
private String loadbalancerType;

修改 xpc-rpc-consumer-common下的 com.xpc.rpc.consumer.config.RpcConsumerConfig,新增一个配置项

/**
 * 负载均衡类型
 */
private String loadbalancerType;

修改 xpc-rpc-register-common 下的 com.xpc.rpc.register.common.RegisterService

在服务发现接口新增一个负责均衡类型参数

package com.xpc.rpc.register.common;

import com.xpc.rpc.protocol.meta.ServiceMeta;
import com.xpc.rpc.register.common.config.RegisterConfig;

public interface RegisterService {

      /**
       * 服务注册
       * @param serviceMeta 服务注册元数据
       */
      void register(ServiceMeta serviceMeta) throws Exception;

      /**
       * 服务发现
       * @param serviceName
       * @loadbalancerType: 负载均衡策略类型
       * @return
       * @throws Exception
       */
      ServiceMeta discovery(String serviceName,String loadbalancerType) throws Exception;

      /**
       * 初始化方法
       */
      void init(RegisterConfig registerConfig);
}

修改它的实现方法

package com.xpc.rpc.register.zookeeper;

import com.xpc.rpc.loadbalancer.api.ServiceLoadBalancerApi;
import com.xpc.rpc.protocol.meta.ServiceMeta;
import com.xpc.rpc.register.common.RegisterService;
import com.xpc.rpc.register.common.config.RegisterConfig;
import com.xpc.rpc.spi.loader.ExtensionLoader;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;

/**
 * zookeeper注册中心功能实现
 */
public class ZookeeperRegisterServiceImpl implements RegisterService {

    //其它代码省略
    
    
    @Override
    public ServiceMeta discovery(String interfaceName,String loadbalancerType) throws Exception {
        Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(interfaceName);
        
        
        //通过SPI机制以及负载均衡类型获取对应的负载均衡策略
        ServiceLoadBalancerApi serviceLoadBalancerApi = ExtensionLoader.getExtension(ServiceLoadBalancerApi.class, loadbalancerType);
        ServiceInstance<ServiceMeta> serviceInstance = (ServiceInstance<ServiceMeta>)serviceLoadBalancerApi.select((List<ServiceInstance<ServiceMeta>>) serviceInstances);
       
       //返回一个ServiceData
        return serviceInstance.getPayload();
    }

  
}

测试

先启动服务提供者服务 xpc-rpc-web-provider

手撸RPC框架 -SPI机制扩展随机负载均衡策略

在服务消费者服务 xpc-rpc-web-consumer 的 application.yml新增负载均衡类型

xpc:
  registerAddress: 127.0.0.1:2181
  registerType: zookeeper
  registerPort: 21770
  packageName: com.xpc
  loadbalancerType: random
server:
  port: 8081

启动服务消费者服务,然后在浏览器上访问 http://localhost:8081/consumer

手撸RPC框架 -SPI机制扩展随机负载均衡策略

可以看到已经通过我们的随机负载均衡策略去获取一个服务了

转载自:https://juejin.cn/post/7254861171823558716
评论
请登录