使用GO实现gRPC
什么是gRPC
服务器流?
gRPC
服务器流是这样实现的:客户端发送一个请求,服务器在同一通道或连接中返回多个消息。客户端从返回的流中读取信息,直到没有更多的信息。
这种方法可以在我们有很多信息,并且我们想把它作为一个缓冲区来处理,分部分读取,并在客户端安全地完成。
想想一个最常见的例子,一个视频流媒体平台,在流程中,客户端不能在一个请求中得到所有的视频来处理它,在这种情况下,我们可以使用一个服务器流,把视频的每个部分按其需要传递给客户端。
例子
在下面的例子中,我们将模拟一个来自客户端的请求,该请求发送一个ID
,然后服务器将在每个消息上响应一系列的字节,这就是模拟一种数据缓冲区。
很简单吧,让我们去实现吧。
项目结构:
注意:对于这段代码,我没有遵循任何架构来串联包,这只是为了告诉你如何实现一个
gRPC
服务器流的客户端和服务器。
Step 1
你需要安装以下依赖项,并确保在环境变量中包含:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Step 2
在这个步骤中,我们将遵循以下几点:
- 定义带有信息和服务器定义的
proto
文件。 - 从
proto
生成golang文件。 - 实现
gRPC
服务器流。 - 实现
gRPC
客户端。
定义带有信息和服务器定义的proto
文件
在proto
文件中,我们将定义两条消息,一条是请求DataRequest
,另一条是响应DataResponse
。
DataRequest: 将被客户端用来消费gRPC服务器。在这个例子中,我们只是定义了一个ID属性。
DataResponse: 将被服务器用来生成响应。在这个例子中,响应将定义缓冲区,它是假数据流的值,以及帮助我们识别包的位置的部分。
另一件重要的事情是StreamingService
的定义,它将被用来准备我们的流媒体服务器,你可以看到在返回值中我们在响应数据类型前定义了关键词stream
,这样做表明我们将使用一个流媒体服务器。
dataStreaming.proto:
syntax = "proto3";
package protofiles;
option go_package = "protofiles/data_streaming";
message DataRequest {
string id = 1;
}
message DataResponse {
string buffer = 1;
int32 part = 2;
}
service StreamingService {
//unary
rpc GetDataStreaming(DataRequest) returns (stream DataResponse) {}
}
从proto生成golang文件
为了生成所需的文件,你可以使用以下命令:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=require_unimplemented_servers=false:. protofiles/data_streaming/dataStreaming.proto
运行该命令后,你会看到两个新的自动生成的文件:
这些文件中的每一个都包含了来自proto
文件的所有转义,这意味着你将在里面找到响应和请求的结构,实现gRPC
服务器和客户端的接口和结构,等等。
实现gRPC
服务器流
为了实现我们的服务器,我们需要定义一个结构,用来实现gRPC
服务方法,这些方法应该与生成的文件中定义的相同。在我们的例子中,我们需要实现GetDataStreaming
。
我们可以在下面的接口中看到生成文件中的签名:
type StreamingServiceServer interface {
GetDataStreaming(
*DataRequest,
StreamingService_GetDataStreamingServer
) error
}
在GetDataStreaming
方法中,我们将向客户端发送每条消息,正如你所看到的,我们通过使用for
循环来实现,在每次迭代中我们都会发送流媒体。
现在让我们谈谈定义在main
中的代码。在那里我们可以找到执行服务器所需的代码,它被定义为一个TCP
。我们可以看到的其他重要的事情是对grpc.NewServer()
的调用,它将创建一个新的gRPC
服务器,然后我们需要注册服务器,用方法实现的结构从生成的文件中调用RegisterStreamingServiceServer
。
package server
import (
"log"
"math/rand"
"net"
pb "server_stream_example/protofiles/data_streaming"
"google.golang.org/grpc"
)
type server struct{}
// *DataRequest, StreamingService_GetDataStreamingServer) error
func (s server) GetDataStreaming(req *pb.DataRequest, srv pb.StreamingService_GetDataStreamingServer) error {
log.Println("Fetch data streaming")
for i := 0; i < 10; i++ {
value := randStringBytes(500)
resp := pb.DataResponse{
Part: int32(i),
Buffer: value,
}
if err := srv.Send(&resp); err != nil {
log.Println("error generating response")
return err
}
}
return nil
}
func randStringBytes(n int) string {
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}
func main() {
// create listener
listener, err := net.Listen("tcp", "localhost:8080")
if err != nil {
panic("error building server: " + err.Error())
}
// create gRPC server
s := grpc.NewServer()
pb.RegisterStreamingServiceServer(s, server{})
log.Println("start server")
if err := s.Serve(listener); err != nil {
panic("error building server: " + err.Error())
}
}
实现gRPC
客户端
对于我们的客户,我们将使用生成的proto
文件中的代码,这样做我们就可以执行在我们的服务器上定义的功能。
第一步是拨号连接到我们的gRPC
服务器grpc.Dial
,然后我们需要创建一个服务器-客户端的实例,这将允许我们通过函数调用来消费服务器。
与只等待一个响应的单数客户端不同,消费服务器流的客户端需要迭代,直到所有的消息都由服务器发送。
在这个例子中,我们正在检查EOF
错误,以控制客户端何时获得所有的消息。
package main
import (
"context"
"fmt"
"io"
"log"
"google.golang.org/grpc"
pb "server_stream_example/protofiles/data_streaming"
)
func main() {
// dial to server
conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure())
if err != nil {
log.Println("Error connecting to gRPC server: ", err.Error())
}
defer conn.Close()
// create the stream
client := pb.NewStreamingServiceClient(conn)
req := pb.DataRequest{Id: "123"}
stream, err := client.GetDataStreaming(context.Background(), &req)
if err != nil {
panic(err) // dont use panic in your real project
}
for {
resp, err := stream.Recv()
if err == io.EOF {
return
} else if err == nil {
valStr := fmt.Sprintf("Response\n Part: %d \n Val: %s", resp.Part, resp.Buffer)
log.Println(valStr)
}
if err != nil {
panic(err) // dont use panic in your real project
}
}
}
运行服务器和客户端服务
server/main.go
client/main.go
准备好了! 现在你已经有了开始建立你自己的流媒体gRPC
服务的基础,记住这只是一个指南,帮助你了解它是如何工作和如何实现的。从这里开始,我们就可以开始讨论与gRPC世界有关的更复杂的解决方案了。
如果你在运行客户端出现以下错误时:
panic: rpc error: code = Unavailable desc = connection error: desc = "error reading server preface: http2: frame too large"
调整一下服务端以及客户端所使用的端口即可
转载自:https://juejin.cn/post/7193515335339737147