手撸RPC框架 - 服务提供者收发消息基础实现
大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧
本章源码地址:gitee.com/baojh123/se…
自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16
目标
在前面的章节中,我们已经实现了自定义注解的以及扫描解析自定义注解的功能。作为一个服务提供者来说。首先就需要能够接收到服务消费者发送过来的请求,所以这一章我们来实现一下服务提供者收发消息的基础功能
Netty服务端功能实现
RPC请求都是通过网络传输,现在比较流行像Mina,还有Netty,我们使用Netty来实现网络请求的接收和发送
新建Netty服务启动与关闭的通用接口 com.xpc.rpc.provider.server.api.Server
package com.xpc.rpc.provider.server.api;
public interface Server {
/**
* 启动Netty服务
*/
void startNettyServer();
/**
* 停止Netty服务
*/
void shutDown();
}
创建Netty服务端具体实现类 com.xpc.rpc.provider.server.base.BaseServer
package com.xpc.rpc.provider.server.base;
import com.xpc.rpc.common.utils.RemotingUtil;
import com.xpc.rpc.provider.handler.RpcProviderHandler;
import com.xpc.rpc.provider.server.api.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BaseServer implements Server {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseServer.class);
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap bootstrap;
@Override
public void startNettyServer() {
/**
* 性能优化
*/
if(useEpoll()) {
bossGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup();
}else {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
}
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//Netty自带的String类型编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//我们自己实现的处理器
pipeline.addLast(new RpcProviderHandler());
}
});
try {
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 21778).sync();
LOGGER.info("Netty 服务端启动成功............");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.error("Netty 服务端启动失败:{}",e);
}finally {
shutDown();
}
}
@Override
public void shutDown() {
if(bossGroup != null) {
bossGroup.shutdownGracefully();
}
if(workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
}
}
接下来就是服务端我们自己定义的处理器 com.xpc.rpc.provider.handler.RpcProviderHandler,这里就是后续服务提供者接收到请求之后具体的处理类,所有的业务逻辑都会在这里实现
package com.xpc.rpc.provider.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcProviderHandler extends SimpleChannelInboundHandler<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcProviderHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
LOGGER.info("服务端收到消息:{}",msg);
//这里再写回一条消息给客户端
ctx.writeAndFlush("你好,我是服务端........");
}
}
至此服务端的代码就实现完了,接下来就是服务系消费者发送消息的基础功能实现,创建一个子工程 xpc-rpc-consumer,pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xpc-rpc</artifactId>
<groupId>com.xpc</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>xpc-rpc-consumer</artifactId>
</project>
创建启动类 com.xpc.rpc.consumer.RpcConsumer
package com.xpc.rpc.consumer;
import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);
private Bootstrap bootstrap;
private EventLoopGroup eventLoopGroup;
public RpcConsumer() {
}
/**
* 启动服务消费者
*/
public void start() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//String类型的编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//自定义处理器
pipeline.addLast(new RpcConsumerHandler());
}
});
try {
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 21778).sync();
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if(future.isSuccess()) {
LOGGER.info("客户端连接成功........");
}else {
LOGGER.info("客户端连接失败........");
}
}
});
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
创建自定义处理器 com.xpc.rpc.consumer.handler.RpcConsumerHandler
package com.xpc.rpc.consumer.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcConsumerHandler extends SimpleChannelInboundHandler<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("开始发送消息..........");
String msg = "hello rpc";
ctx.writeAndFlush(msg);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
LOGGER.info("这是服务端发送过来的消息: {}",s);
}
}
到此服务提供者收发消息的基础功能已经实现
测试
写好的功能怎么能不测试呢
在 xpc-rpc-test 下新建一个服务提供者的测试类 com.xpc.test.netty.ProviderTest
package com.xpc.test.netty;
import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;
public class ProviderTest {
@Test
public void startNetty() {
BaseServer baseServer = new BaseServer();
baseServer.startNettyServer();
}
}
在 xpc-rpc-test 下新建一个服务消费者的测试类 com.xpc.test.netty.ConsumerTest
package com.xpc.test.netty;
import com.xpc.rpc.consumer.RpcConsumer;
import org.junit.Test;
public class ConsumerTest {
@Test
public void startConsumer() {
RpcConsumer rpcConsumer = new RpcConsumer();
rpcConsumer.start();
}
}
先启动服务提供者 ProviderTest,然后再启动服务消费者
服务提供者日志:
服务消费者日志:
转载自:https://juejin.cn/post/7252684645992677437