likes
comments
collection
share

手撸RPC框架 - 服务提供者调用真实方法功能实现

作者站长头像
站长
· 阅读数 20

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前言

在前面几章我们已经实现了服务提供者收发消息的功能了,但是作为服务提供者,是应该调用我们真实的方法来获取返回结果,然后在封装成响应信息来返回给服务消费者

本章我们就实现一下服务提供者调用真实方法的功能

开干

com.xpc.rpc.provider.server.base.BaseServer 类新增二个成员变量

/**
 * @RpcService标注的类对应的接口的class
 */
protected Map<String,Object> handlerMap = new HashMap<>();

//扫描的包路径
private String packageName;

在 startNettyServer()方法中对handlerMap进行赋值

@Override
public void startNettyServer() {
    try {
        this.handlerMap = DubboServiceScanner.doScanDubboServiceByPackages(packageName);
    } catch (Exception e) {
        LOGGER.error("scan @DubboService error: {}",e);
    }
    
    //其它代码省略

bootstrap.group(bossGroup,workerGroup)
        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG,128)
        .childOption(ChannelOption.SO_KEEPALIVE,true)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                ChannelPipeline pipeline = channel.pipeline();
                //我们自己的编解码器
                pipeline.addLast(new RpcDecoder());
                pipeline.addLast(new RpcEncoder());
                
                //我们自己实现的处理器
                //将handlerMap传入进去
                pipeline.addLast(new RpcProviderHandler(handlerMap));
            }
        });
}

新增构造方法,对packageName进行赋值

public BaseServer(String packageName) {
    this.packageName = packageName;
}

接下来就是改造服务提供者的自定义处理器了 com.xpc.rpc.provider.handler.RpcProviderHandler

package com.xpc.rpc.provider.handler;

import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.Map;

public class RpcProviderHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcRequest>> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);

    private Map<String,Object> handlerMap;

    public RpcProviderHandler(Map<String,Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage<RpcRequest> protocolMessage) throws Exception {
        RpcHeader rpcHeader = protocolMessage.getRpcHeader();
        RpcRequest request = protocolMessage.getT();
        //处理请求消息
        ProtocolMessage<RpcResponse> protocolResponseMessage = handlerMessage(rpcHeader,request);
        //写会响应结果
        ctx.writeAndFlush(protocolResponseMessage);
    }

    /**
     * 利用反射技术调用真实方法获取结果
     * @param rpcHeader
     * @param request
     */
    private ProtocolMessage<RpcResponse> handlerMessage(RpcHeader rpcHeader, RpcRequest request) {
        ProtocolMessage<RpcResponse> protocolMessage = new ProtocolMessage<RpcResponse>();
        rpcHeader.setMsgType(RpcMsgType.RESPONSE.getType());
        protocolMessage.setRpcHeader(rpcHeader);
        RpcResponse response = new RpcResponse();

        String className = request.getClassName();
        if(!handlerMap.containsKey(className)) {
            throw new RuntimeException("no find class for " + className);
        }
        Object classBean = handlerMap.get(className);
        Class<?> beanClass = classBean.getClass();
        Method method;
        Object result = null;
        try {
            method = beanClass.getMethod(request.getMethodName(), request.getParameterTypes());
            method.setAccessible(true);
            result = method.invoke(classBean,request.getParameters());
            response.setData(result);
            response.setCode(200);
        } catch (Exception e) {
            response.setErrMsg(e.getMessage());
            response.setCode(500);
        }
        protocolMessage.setT(response);
        return protocolMessage;
    }
}

测试

修改:com.xpc.test.scanner.DemoService,新增一个方法

package com.xpc.test.scanner;

public interface DemoService {

    String hello(String name);
}

修改它的实现类:com.xpc.test.scanner.DemoServiceImpl

package com.xpc.test.scanner;

import com.xpc.rpc.annotation.DubboService;

@DubboService(interfaceClass = DemoService.class)
public class DemoServiceImpl implements DemoService{

    @Override
    public String hello(String name) {
        return "hello " + name;
    }
}

修改:com.xpc.consumer.handler.RpcConsumerHandler

package com.xpc.rpc.consumer.handler;

import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcConsumerHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcResponse>> {


    private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送一个请求
        ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<RpcRequest>();
        RpcHeader rpcHeader = new RpcHeader();
        rpcHeader.setMsgType(RpcMsgType.REQUEST.getType());
        rpcHeader.setRequestId(1L);

        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName("com.xpc.test.scanner.DemoService");
        rpcRequest.setMethodName("hello");
        rpcRequest.setParameterTypes(new Class[]{String.class});
        rpcRequest.setParameters(new Object[]{"coco"});

        protocolMessage.setRpcHeader(rpcHeader);
        protocolMessage.setT(rpcRequest);
        ctx.writeAndFlush(protocolMessage);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolMessage<RpcResponse> protocolMessage) throws Exception {
        LOGGER.info("code: {}",protocolMessage.getT().getCode());
        LOGGER.info("data: {}",protocolMessage.getT().getData());
    }
}

先启动服务提供者:com.xpc.test.netty.ProviderTest

package com.xpc.test.netty;

import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;

public class ProviderTest {


    @Test
    public void startNetty() {
        BaseServer baseServer = new BaseServer("com.xpc");
        baseServer.startNettyServer();
    }
}

然后启动服务消费者:com.xpc.test.netty.ConsumerTest

package com.xpc.test.netty;

import com.xpc.rpc.consumer.RpcConsumer;
import org.junit.Test;

public class ConsumerTest {

    @Test
    public void startConsumer() {
        new RpcConsumer();
    }
}

服务提供者日志:

手撸RPC框架 - 服务提供者调用真实方法功能实现

服务消费者日志:

手撸RPC框架 - 服务提供者调用真实方法功能实现