Netty实战篇-手写DubboRpc框架
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
1. RPC 基本介绍
rpc是远程调用的一种行为,在数据传输过程中涉及到传输协议,http就是一种传输协议。
RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。
- 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。
- 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
常见的 RPC 框架有:
- 阿里的Dubbo
- google的gRPC
- Go语言的rpc
- Apache的thrift
- Spring旗下的 Spring Cloud。
2. RPC 调用流程
说明:
- 服务消费方(client)以本地调用方式调用服务
- client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
- client stub 将消息进行编码并发送到服务端
- server stub 收到消息后进行解码
- server stub 根据解码结果调用本地的服务
- 本地服务执行并将结果返回给 server stub
- server stub 将返回导入结果进行编码并发送至消费方
- client stub 接收到消息并进行解码】
- 服务消费方(client)得到结果
RPC 的目标就是将 2-8 这些步骤都封装起来
,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用
3. 实现 dubbo RPC(基于 Netty)
3.1 需求说明
-
dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
-
模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.20
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
3.2 设计说明
- 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
3.3 代码实现
项目结构:
客户端启动器ClientBootstrap
public class ClientBootstrap {
//定义协议头
public static final String providerName = "HelloService#hello";
public static void main(String[] args) throws InterruptedException {
NettyClient client = new NettyClient();
HelloService serviceProxy = (HelloService) client.getBean(HelloService.class, providerName);//拿到代理对象
// for (; ; ) {
//调用客户端的方法
// Thread.sleep(2000);
String result = serviceProxy.hello("阿昌来也");
System.out.println("客户端调用服务端,结果为:" + result);
// }
}
}
服务端启动器ServerBootstrap
public class ServerBootstrap {
public static void main(String[] args) throws InterruptedException {
NettyServer.startServer("127.0.0.1",7000);
}
}
客户端初始化类NettyClient
public class NettyClient {
//创建线程池
private static ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler nettyClientHandler;
/**
* 编写方式使用代理模式,获取一个代理对象
* @param serviceClass service类
* @param providerName 协议头
* @return 代理对象
*/
public Object getBean(final Class<?> serviceClass,final String providerName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass},
((proxy, method, args) -> {
//客户端每调用一次就会进入该代码块
//第一次调用
if (nettyClientHandler==null){
startClient0("127.0.0.1",7000);
}
//设置要发送给服务器的信息
//providerName协议头,args传入的参数
nettyClientHandler.setParam(providerName+args[0]);
return executors.submit(nettyClientHandler).get();
}
));
}
//初始化客户端
private static void startClient0(String ipaddr,Integer port){
nettyClientHandler = new NettyClientHandler();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
Bootstrap clientBootstrap = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(nettyClientHandler);
}
});
clientBootstrap.connect(ipaddr,port).sync();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户端处理器NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext channelHandlerContext;//上下文
private String result;//调用的返回结果
private String param;//客户端调用方法时的参数
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
//收到服务器的数据后就会被调用
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead");
result = msg.toString();
notify();//唤醒等待的线程
}
//与服务器连接成功后就会被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
channelHandlerContext = ctx;
}
//被代理对象调用,异步发送数据给服务器,然后阻塞,会等待被唤醒
@Override
public synchronized Object call() throws Exception {
System.out.println("call1");
channelHandlerContext.writeAndFlush(param);
//进行wait阻塞
wait();
System.out.println("call2");
return result;
}
//设置发送的数据
void setParam(String msg){
System.out.println("setParam");
this.param = msg;
}
}
服务端初始化类NettyServer
public class NettyServer {
public static void startServer(String hostname,Integer port) throws InterruptedException {
startServer0(hostname,port);
}
private static void startServer0(String hostname,Integer port) throws InterruptedException {
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
ServerBootstrap serverBootstrap = bootstrap.group(boosGroup, workerGroup)
// .handler(new LoggingHandler())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("服务端启动成功....端口:"+port);
ChannelFuture cf = serverBootstrap.bind(hostname, port).sync();
cf.channel().closeFuture().sync();
}finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端处理器NettyServerHandler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送来的消息,并调用服务
System.out.println("msg="+msg);
//客户端想要调用服务器的api时,想要满足一定协议的要求才能调用
//比如,我们这里要求,每次发送消息时,都必须要求以"HelloService#hello开头"
if (msg.toString().startsWith("HelloService#hello")){
String result = new HelloServiceImpl().hello(msg.toString().split("HelloService#hello")[1]);
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端接口的真正实现Impl:HelloServiceImpl
public class HelloServiceImpl implements HelloService {
private static int count = 0;
@Override
public String hello(String message) {
System.out.println("客户端发来的消息为:【"+message+"】");
if (message!=null){
return "你好客户端,服务端已经收到了消息"+"调用次数为:【"+(++count)+"】";
}else {
return "消息不能为空";
}
}
}
服务提供方 和 服务消费放 公共部分,约定的接口规范 HelloService
public interface HelloService {
String hello(String message);
}
参考文档
Netty学习和源码分析github地址 Netty从入门到精通视频教程(B站) Netty权威指南 第二版
转载自:https://juejin.cn/post/7102020346861060109