likes
comments
collection
share

一个基于消息队列的的Go语言RPC框架

作者站长头像
站长
· 阅读数 7

一个基于消息队列的的Go语言RPC框架

项目地址: github.com/yc90s/xrpc.…

前言

RPC作为分布式系统中的基础组件, 使用非常广泛。大多数的RPC框架都是基于点对点的网络连接, 比如golang原生的rpc框架、grpc等. 点对点连接的通讯方式, 随着集群节点的增加, 会导致集群的拓扑结构越来越复杂, 服务之间的耦合度越来越高, 服务的扩展性和可维护性都会受到影响.

而消息队列的通讯方式, 可以很好的解决这个问题。每个服务只需要关注自己订阅的消息, 不需要关心消息的发送者是谁, 也不需要关心消息的接收者是谁.

XRPC设计的原则是为了实现一套基于消息队列的、易于拓展和易于使用的轻量级RPC框架.

XRPC的特性

除了上面提到的使用消息队列作为RPC的通道之外, XRPC还有以下几个比较实用的特点

  • 支持任意参数数量的远程调用, 不需要把接口的参数都打包成一个结构体再调用, 可以像调用本地函数一样
  • 支持CallCast两种远程调用方式, Call会一直阻塞直到接收到返回值或者超时, 而Cast适用于不需要等待返回值的情况
  • 代码生成, 实现了一套IDL, 最大程度贴近go语法, 用来定义rpc服务的接口信息, 自动生成接口代码

此外, XRPC的核心代码非常精简, 而且非常容易拓展.

XRPC的实现

一个RPC框架可以大体分为三个部分:通信、编码/解码、服务调用. 我从这三个方面分别介绍XRPC是怎么做的.

XRPC的每个服务都会订阅一个主题, 等待接收远程调用的消息, 每个服务可以注册多个接口. 客户端将要调用的接口和参数序列化后发布到对应的主题, 服务端收到消息后利用反射调用对应接口, 并将结果通过消息队列返回客户端.

通信

XRPC抽象出了一套消息队列接口

type MQueen interface {
	GenerateSubj() string
	Publish(string, []byte) error
	Subscribe(string, MQCallback) error
	UnSubscribe() error
}
  • GenerateSubj 生成一个唯一的订阅主题名
  • Publish 发布一条消息到指定的主题
  • Subscribe 订阅指定的主题
  • UnSubscribe 取消订阅

要拓展使用其他的消息队列, 只需要实现MQueen接口即可, 目前实现的有nats.

编码/解码

编码/解码部分XRPC也抽象了一个接口

type Codec interface
{
    Unmarshal(b []byte, dst any) error
	Marshal(v any) ([]byte, error)
}

同样只要实现这个接口就可以拓展自己的序列化方式, 目前实现的有gob、protobuf, 默认采用gob

服务调用

XRPC的支持注册任意数量参数的接口, 以及CallCast两种远程调用方式. 并且实现了一套IDL, 最大程度贴近go语法, 用来定义rpc接口信息, 并自动生成相关代码, 下面是一个简单的例子.

首先定义我们的RPC服务接口hello.service

package main 

service HelloService {
    Hello(string) (string, error)
}

可以看到语法和Go非常类似, 它里面定义了一个名叫HelloService服务, 包含一个Hello方法, 有一个参数两个返回值.

一个文件里面可以定义多个服务, 每个服务可以定义多个接口, 接口支持任意数量的参数.

然后生成接口代码, 执行下面的命令会在当前目录生成一个hello.service.go文件

xrpc hello.service

接下来就可以实现我们的RPC服务

type HelloRPCService struct {
	*xrpc.RPCServer
}

func (s *HelloRPCService) Hello(request string) (string, error) {
	reply := "hello:" + request
	return reply, nil
}

// 创建服务
func newHelloService(nc *nats.Conn) *HelloRPCService {
	s := &HelloRPCService{
		RPCServer: xrpc.NewRPCServer(
			xrpc.SetMQ(natsmq.NewMQueen(nc)),
			xrpc.SetSubj("hello_server"),
		),
	}
	RegisterHelloServiceServer(s.RPCServer, s)
	return s
}
  • HelloRPCService 实现了我们定义的RPC接口,
  • Hello 接口将收到的消息添加"hello:"前缀并将结果返回
  • newHelloService 方法用来创建RPC服务, 采用nats消息队列, 指定订阅的主题为hello_server

接下来创建RPC客户端

func newHelloRPCServiceClient(nc *nats.Conn) *HelloServiceClient {
	return NewHelloServiceClient(xrpc.NewRPCClient(
		xrpc.SetMQ(natsmq.NewMQueen(nc)),
		xrpc.SetSubj("hello_client"),
	))
}

现在我们就可以调用远程接口了

func main() {
	nc, err := nats.Connect("nats://127.0.0.1:4222", nats.MaxReconnects(1000))
	if err != nil {
		panic(err)
	}
	defer nc.Close()

	// 启动RPC服务
	s := newHelloService(nc)
	err = s.Start()
	if err != nil {
		panic(err)
	}
	defer s.Stop()

	// 创建RPC客户端
	c := newHelloRPCServiceClient(nc)
	defer c.Close()

	// 调用Hello方法, 第一个参数是rpc服务的名称
	reply, err := c.Hello("hello_server", "yc90s")
	if err != nil {
		panic(err)
	}
	fmt.Println(reply) // 输出: hello:yc90s
}

Hello接口的第一个参数是RPC服务订阅的主题名, 例子里是hello_server, 后面的参数是传递给远程接口实际调用的参数, 程序最后输出"hello:yc90s"

总结

RPC作为分布式系统中基础又重要的一个组件, 所以我将其单独开源出来, 后续会再xrpc基础上, 再开源一个分布式服务器框架, 欢迎感兴趣的同学一起交流