likes
comments
collection
share

手撸RPC框架 - 服务提供者收发消息基础实现

作者站长头像
站长
· 阅读数 17

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

目标

在前面的章节中,我们已经实现了自定义注解的以及扫描解析自定义注解的功能。作为一个服务提供者来说。首先就需要能够接收到服务消费者发送过来的请求,所以这一章我们来实现一下服务提供者收发消息的基础功能

手撸RPC框架  -  服务提供者收发消息基础实现

Netty服务端功能实现

RPC请求都是通过网络传输,现在比较流行像Mina,还有Netty,我们使用Netty来实现网络请求的接收和发送

新建Netty服务启动与关闭的通用接口 com.xpc.rpc.provider.server.api.Server

package com.xpc.rpc.provider.server.api;

public interface Server {

    /**
     * 启动Netty服务
     */
    void startNettyServer();

    /**
     * 停止Netty服务
     */
    void shutDown();
}

创建Netty服务端具体实现类 com.xpc.rpc.provider.server.base.BaseServer

package com.xpc.rpc.provider.server.base;

import com.xpc.rpc.common.utils.RemotingUtil;
import com.xpc.rpc.provider.handler.RpcProviderHandler;
import com.xpc.rpc.provider.server.api.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseServer implements Server {

    private static final Logger LOGGER = LoggerFactory.getLogger(BaseServer.class);

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    private ServerBootstrap bootstrap;

    @Override
    public void startNettyServer() {
        /**
         * 性能优化
         */
        if(useEpoll()) {
            bossGroup = new EpollEventLoopGroup();
            workerGroup = new EpollEventLoopGroup();
        }else {
            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
        }
        bootstrap = new ServerBootstrap();

        bootstrap.group(bossGroup,workerGroup)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,128)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //Netty自带的String类型编解码器
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        //我们自己实现的处理器
                        pipeline.addLast(new RpcProviderHandler());
                    }
                });

        try {
            ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync();
            LOGGER.info("Netty 服务端启动成功............");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            LOGGER.error("Netty 服务端启动失败:{}",e);
        }finally {
            shutDown();
        }
    }

    @Override
    public void shutDown() {
        if(bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if(workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    private boolean useEpoll() {
        return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
    }
}

接下来就是服务端我们自己定义的处理器 com.xpc.rpc.provider.handler.RpcProviderHandler,这里就是后续服务提供者接收到请求之后具体的处理类,所有的业务逻辑都会在这里实现

package com.xpc.rpc.provider.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcProviderHandler extends SimpleChannelInboundHandler<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        LOGGER.info("服务端收到消息:{}",msg);
        //这里再写回一条消息给客户端
        ctx.writeAndFlush("你好,我是服务端........");
    }
}

至此服务端的代码就实现完了,接下来就是服务系消费者发送消息的基础功能实现,创建一个子工程 xpc-rpc-consumer,pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>xpc-rpc</artifactId>
        <groupId>com.xpc</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>xpc-rpc-consumer</artifactId>


</project>

创建启动类 com.xpc.rpc.consumer.RpcConsumer

package com.xpc.rpc.consumer;

import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class RpcConsumer {

     private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);

     private Bootstrap bootstrap;

     private EventLoopGroup eventLoopGroup;


     public RpcConsumer() {

     }

    /**
     * 启动服务消费者
     */
    public void start() {
         eventLoopGroup = new NioEventLoopGroup();
         bootstrap = new Bootstrap();

         bootstrap.group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(SocketChannel channel) throws Exception {
                         ChannelPipeline pipeline = channel.pipeline();
                         //String类型的编解码器
                         pipeline.addLast(new StringDecoder());
                         pipeline.addLast(new StringEncoder());
                         //自定义处理器
                         pipeline.addLast(new RpcConsumerHandler());
                     }
                 });
         try {
             ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync();
             channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                 @Override
                 public void operationComplete(Future<? super Void> future) throws Exception {
                     if(future.isSuccess()) {
                         LOGGER.info("客户端连接成功........");
                     }else {
                         LOGGER.info("客户端连接失败........");
                     }
                 }
             });
             channelFuture.channel().closeFuture().sync();
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }

}

创建自定义处理器 com.xpc.rpc.consumer.handler.RpcConsumerHandler

package com.xpc.rpc.consumer.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcConsumerHandler extends SimpleChannelInboundHandler<String> {


    private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("开始发送消息..........");
        String msg = "hello rpc";
        ctx.writeAndFlush(msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        LOGGER.info("这是服务端发送过来的消息: {}",s);
    }
}

到此服务提供者收发消息的基础功能已经实现

测试

写好的功能怎么能不测试呢

xpc-rpc-test 下新建一个服务提供者的测试类 com.xpc.test.netty.ProviderTest

package com.xpc.test.netty;

import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;

public class ProviderTest {


    @Test
    public void startNetty() {
        BaseServer baseServer = new BaseServer();
        baseServer.startNettyServer();
    }
}

xpc-rpc-test 下新建一个服务消费者的测试类 com.xpc.test.netty.ConsumerTest

package com.xpc.test.netty;

import com.xpc.rpc.consumer.RpcConsumer;
import org.junit.Test;

public class ConsumerTest {

    @Test
    public void startConsumer() {
        RpcConsumer rpcConsumer = new RpcConsumer();
        rpcConsumer.start();
    }
}

先启动服务提供者 ProviderTest,然后再启动服务消费者

服务提供者日志: 手撸RPC框架  -  服务提供者收发消息基础实现

服务消费者日志:

手撸RPC框架  -  服务提供者收发消息基础实现