【BlossomRPC】服务端与客户端服务解析注解的实现
服务端注解的实现
我们的项目既然基于Spring,那么对于这个RpcServiceDeclaration这个注解的处理其实就非常简单了。 我们直接扫描所有的Bean对象,然后得到带有当前注解的类信息,然后将其信息封装一下,注册到注册中心即可。 这里我们首先思考一下注册上去需要那些基础信息:
- 服务ip
- 服务端口
- 服务名称
同时,我们可以简单的进行一下优化,因为其实我们所有的请求并不一定都需要发送到注册中心中进行处理,因为我们有可能收到根本不存在的方法。因此,我们可以简单的使用一个本地的Cache来存储服务是否存在于注册中心,因此,我们可以在项目启动的时候,不单单将服务信息注册到注册中心,还可以同时缓存一份到本地,用来判断到底有哪些服务我们可以从配置中心中获取。
我们首先实现第一步,在项目启动的时候,将带有RpcServiceDeclaration注解的服务信息注册到注册中心。
这里我们需要扫描Spring容器中所有的Bean了,这里我们可以用BeanPostProcessor。
对于PostProcessor/Aware/Factory的区别,可以看星球内部的如下几个文档进行学习
我们在项目启动的时候对所有的Bean进行扫描,然后通过我们的注册中心,注册上去。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//扫描存在当前注解的所有方法
if (bean.getClass().isAnnotationPresent(RpcServiceDeclaration.class)) {
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
//存放这些方法的信息到rpcmethod-cache中
//TODO 在server收到rpc请求的时候,从这个cache中
//拿到请求然后调用对应的method的请求方法
String serviceName = bean.getClass().getInterfaces()[0].getName();
String key = serviceName + "." + method.getName();
RpcServiceMethod rpcServiceMethod = new RpcServiceMethod();
rpcServiceMethod.setMethodPath(bean);
rpcServiceMethod.setMethod(method);
RpcServiceMethodCache.METHOD_CACHE.put(key, rpcServiceMethod);
RpcServiceInstance instance = RpcServiceInstance.builder()
.serviceName(serviceName)
.serviceIp(this.serverAddress)
.servicePort(this.serverPort)
.build();
try {
//调用注册中心进行服务注册
registerService.register(instance);
} catch (Exception e) {
e.printStackTrace();
log.error("register serivce {} failed ,reason: {}", serviceName, e);
}
}
}
return bean;
}
代码中的METHOD_CAHCE的作用就是用来缓存我们当前项目中到底都提供了那些的方法,这是一个类.方法的缓存,作用就是后续方便我们在运行的时候方便我们通过反射的方式来调用方法,然后得到方法的返回值然后返回给客户端。到时候需要用到的时候我们会讲到。
//调用注册中心进行服务注册
registerService.register(instance);
如上的代码就是我们通过注册中心的方式将服务实例信息注册到注册中心上去。
这里我提供了三种实现。
分别是基于SPI方式注入的自研注册中心的实现、基于Nacos注册中心、基于zk注册中心的实现。
这里我以Nacos注册中心为例,当我启动服务端的项目的时候,会自动检测所有带有RpcServiceDeclaration注解的服务,并将其注册到注册中心。
然后我们查看Nacos服务控制台。
到此,我们就成功的实现了项目启动时的服务注册。
当然,这里讲的比较宽泛,我会在接下来的注册中心的章节详细讲解注册中心的实现。
客户端注解的处理
完成了服务端服务的注册,那么接下来,我们就要考虑,如何对客户端的使用了对应注解的属性进行代理,生成代理对象,而这个代理对象需要做如下的事情:
- 扫描所有的RpcAutowiredProxy注解
- 对注解对象进行代理,生成当前属性的代理对象
- 使用代理对象发送请求的时候,会通过Netty发送封装好的请求到服务端
- 服务端处理请求并返回响应
- 客户端接收到响应 这里我个人认为客户端的处理比较复杂,但是具体的思路都是不变的。 就是通过spring扫描所有的bean,并且对bean中带有RpcAutowiredProxy注解的属性进行代理,生成它的代理对象。 这里我提供了两种实现思路,先来看第一种,也是用的比较多的一种。 这种写法的含义就是通过后置处理器扫描所有的bean,并且对他们的属性进行处理,得到所有带有RpcAutowiredProxy注解的属性,然后对这些属性的bean定义信息进行注册,注册按照我们的逻辑来进行注册。 这里的逻辑就是:创建代理对象。
package blossom.project.rpc.core.proxy.spring.client;
import blossom.project.rpc.common.register.RegisterService;
import blossom.project.rpc.core.proxy.spring.SpringRpcProperties;
import blossom.project.rpc.core.proxy.spring.annotation.RpcAutowiredProxy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
/**
* @author: ZhangBlossom
* @date: 2023/12/20 21:00
* @contact: QQ:4602197553
* @contact: WX:qczjhczs0114
* @blog: https://blog.csdn.net/Zhangsama1
* @github: https://github.com/ZhangBlossom
* SpringRpcAutowiredProxyProstProcessor类
* 当前类应该做到如下的事情
* 1: 得到所有RpcAutowiredProxy注解的属性,对这些属性进行代理修改
*/
@Slf4j
public class SpringRpcAutowiredProxyProcessor implements
ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
private ApplicationContext context;
private ClassLoader classLoader;
private final SpringRpcProperties properties;
private final RegisterService registerService;
// 保存代理bean的定义信息
private final Map<String, BeanDefinition> rpcBeanDefinitionCache = new ConcurrentHashMap<>();
public SpringRpcAutowiredProxyProcessor(SpringRpcProperties properties, RegisterService registerService) {
this.properties = properties;
this.registerService = registerService;
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
}
/**
* 在Bean工厂配置阶段处理带有@RpcAutowiredProxy注解的字段。
* 遍历所有的bean定义。
* 对于每个bean定义,使用反射(ReflectionUtils.doWithFields)来访问它的所有字段。
* 对每个字段,调用 resolveRpcAutowiredProxy 方法。
* ReflectionUtils.doWithFields
* ReflectionUtils.doWithFields 是Spring的一个工具方法,
* 用于对给定类的所有字段执行某个操作。这个方法接受两个参数:
* 一个类和一个 FieldCallback。对于类中的每个字段,都会调用这个FieldCallback。
*
* ReflectionUtils.doWithFields(clazz, this::resolveRpcAutowiredProxy)
* 被用来检查每个bean的所有字段,并对每个字段应用
* resolveRpcAutowiredProxy 方法。
*
* resolveRpcAutowiredProxy 方法
* 这个方法用来处理具有 @RpcAutowiredProxy 注解的字段。
* 对于这些字段,它创建一个新的 SpringRpcClientProxy bean定义,并将其添加到
* rpcBeanDefinitionCache(最终注册到Spring容器中)。
* @param beanFactory Bean工厂
* @throws BeansException
*/
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
Stream.of(beanFactory.getBeanDefinitionNames())
.map(beanFactory::getBeanDefinition)
.map(BeanDefinition::getBeanClassName)
.filter(Objects::nonNull)
.map(className -> ClassUtils.resolveClassName(className, this.classLoader))
.forEach(clazz -> ReflectionUtils.doWithFields(clazz, this::resolveRpcAutowiredProxy));
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
rpcBeanDefinitionCache.forEach((beanName, beanDefinition) -> {
if (!context.containsBean(beanName)) {
registry.registerBeanDefinition(beanName, beanDefinition);
log.info("registe bean {} successfully...", beanName);
} else {
log.warn("Spring Context has contain bean {}", beanName);
}
});
}
/**
* 处理带有@RpcAutowiredProxy注解的字段,创建相应的代理Bean定义。
*
* @param field 类字段
*/
private void resolveRpcAutowiredProxy(Field field) {
Optional.ofNullable(AnnotationUtils
.getAnnotation(field, RpcAutowiredProxy.class))
.ifPresent(rpcAutowiredProxy -> {
BeanDefinitionBuilder builder =
BeanDefinitionBuilder.genericBeanDefinition(SpringRpcClientProxy.class);
builder.setInitMethodName("generateProxy");
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("registerService", this.registerService);
builder.addPropertyValue("registerAddress", properties.getRegisterAddress());
builder.addPropertyValue("registerName", properties.getRegisterName());
builder.addPropertyValue("loadBalanceStrategy", properties.getLoadBalanceStrategy());
builder.addPropertyValue("clientProperties", properties);
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcBeanDefinitionCache.put(field.getName(), beanDefinition);
});
}
}
//注册开始的时候会调用如下方法进行注册
public void generateProxy(){
//RegisterService registerService= RegisterFactory.createRegisterService(
// clientProperties.getRegisterAddress(),
// RegisterTypeEnum.findByName(clientProperties.getRegisterName()),
// LoadBalanceFactory.getLoadBalanceStrategy(
// LoadBalanceTypeEnum.findByName
// (this.loadBalanceStrategy)
// ));
this.object= Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new JdkRpcProxyInvocationHandler(registerService));
}
这里我还提供了第二种实现方式。这种方式更加简单好理解。 依旧是通过扫描所有带有注解的属性,然后对这些属性使用代理工厂,对他们生成代理对象。
package blossom.project.rpc.core.proxy.spring.client;
import blossom.project.rpc.common.register.RegisterService;
import blossom.project.rpc.core.proxy.spring.SpringRpcProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.aop.framework.ProxyFactory;
import org.aopalliance.intercept.MethodInterceptor;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import blossom.project.rpc.core.proxy.spring.annotation.RpcAutowiredProxy;
import java.lang.reflect.Field;
/**
* @author: ZhangBlossom
* @date: 2023/12/19 21:56
* @contact: QQ:4602197553
* @contact: WX:qczjhczs0114
* @blog: https://blog.csdn.net/Zhangsama1
* @github: https://github.com/ZhangBlossom
* SpringRpcAutowiredProxyProcessorGentle类
* Spring Bean后置处理器,用于处理RpcAutowiredProxy注解标记的字段。
*/
@Slf4j
public class SpringRpcAutowiredProxyProcessorGentle
implements BeanPostProcessor {
@Autowired
private ApplicationContext applicationContext;
private final RegisterService registerService;
public SpringRpcAutowiredProxyProcessorGentle(RegisterService registerService) {
this.registerService = registerService;
}
/**
* 在bean初始化之前执行的操作,用于处理RpcAutowiredProxy注解。
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
// 遍历Bean的所有字段,查找RpcAutowiredProxy注解
ReflectionUtils.doWithFields(bean.getClass(), field -> {
RpcAutowiredProxy rpcAutowiredProxy = field.getAnnotation(RpcAutowiredProxy.class);
if (rpcAutowiredProxy != null) {
// 如果存在RpcAutowiredProxy注解,创建代理对象
ProxyFactory proxyFactory = new ProxyFactory();
//设定代理对象的类型
proxyFactory.setTargetClass(field.getType());
//ProxyFactory 用于创建代理对象。我们为其设置目标类,
// 并添加一个MethodInterceptor,这样每次方法调用都会经过这个拦截器。
proxyFactory.addAdvice((MethodInterceptor) invocation ->
new JdkRpcProxyInvocationHandler(registerService)
.invoke(invocation.getThis(), invocation.getMethod(), invocation.getArguments())
);
// 获取代理对象并设置到字段
Object proxy = proxyFactory.getProxy();
field.setAccessible(true);
field.set(bean, proxy);
}
});
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 初始化后的逻辑可以在这里实现
return bean;
}
}
到这里,我们就已经完成了对带有注解的属性的代理对象的创建。 接下来我们就需要思考具体的逻辑,也就是如何实现客户端使用代理对象发送请求,同时服务端接收到请求之后能对请求进行解析和处理。 这里,我们先完成对代理对象逻辑的处理,在代理对象中,我们大概知道,要做的事情就是: 通过之前定义的RPC请求信息,封装这些请求信息,然后将请求信息通过Netty通道发送到服务端,服务端解析这些信息,然后反射的调用客户端要调用的方法,然后得到值之后返回给客户端。 因此很明确,我们需要确保客户端发送的请求,带有要调用的方法的完整信息和参数。 所以我们定义如下的对象:
@Data
public class RpcRequest implements Serializable {
//public static final long serialVersionUID = 200201141215L;
//调用的服务名称(类似于类名)
private String className;
//调用的目标方法名称
private String methodName;
//请求参数
private Object[] params;
//参数类型method.getParameterTypes()返回什么就用什么类型
//考虑到要用反射
private Class<?>[] paramsTypes;
}
我们知道,在发射调用invoke方法中,我们是可以拿到方法名称,参数,类名称等信息的,因此我们就需要封装这些信息为一个类,然后发送给我们的服务端,然后服务端发射调用方法即可。 同时,我们使用Netty发送请求的时候,是异步等待服务端结果的返回值的。 因此,我们的客户端是需要等待服务端的处理结果的,此时线程会进行阻塞。 那么,什么时候阻塞结束? 自然是等到客户端拿到服务端的处理结果了。 如果我们一旦拿到处理结果,就马上结束阻塞,完成数据的返回。 这里需要注意一个点,就是,在同一时间内,由于异步以及并发的问题,我们怎么知道当前的Netty通道收到的消息是哪一个请求被完成了呢? 因此这里我们可以考虑设置一个Map结构的Cache,其中键为我们的reqId,值我们可以考虑使用Future或者Promise。 因为我们知道Future和Promise作为异步任务,调用get方法的时候如果没有数据,那么就会阻塞,如果拿到了数据,就会马上返回。 我们只需要在Client的Handler里面通过reqId对请求信息进行对应,然后设置Future/Promise的值即可,那么此时客户端的代理方法中的get阻塞就会由于拿到值而马上返回。阻塞也就结束,请求也就自然完成了。 代码如下:
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
log.info("client start to invoke the server' function!!!");
RpcDto<RpcRequest> dto=new RpcDto<>();
//构建请求头
long reqId = RpcCache.getRequestId();
RpcHeader header=new RpcHeader(RpcCommonConstants.VERSION_ID,
AlgorithmTypeEnum.JSON.getCode(),
ReqTypeEnum.REQUEST.getCode(),
reqId,0);
dto.setHeader(header);
//设定请求内容
RpcRequest request=new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamsTypes(method.getParameterTypes());
request.setParams(args);
dto.setData(request);
//TODO 不应该每次都new一个Client,Client应该是复用的
// 考虑用final把 time:2023/12/16 01:12
NettyRpcClient nettyClient=new NettyRpcClient(host,port);
//NettyRpcClient nettyClient=new NettyRpcClient();
//得到一个promise对象,先存起来,等doRequest拿到数据之后就会设定值进去
//那么此时promise的get的阻塞就会结束
DefaultPromise<RpcResponse> promise = new DefaultPromise(new DefaultEventLoop());
RpcCache.RESPONSE_CACHE.put(reqId,promise);
nettyClient.doRequest(dto,registerService);
//TODO 方便debug time:2023/12/16 01:14
Object data = promise.get(5, TimeUnit.SECONDS).getData();
return data;
}
因此,此时我们就需要开始编写Handler了。
转载自:https://juejin.cn/post/7352079441526390795