【从0-1 千万级直播项目实战】gRpc落地 | 可能是全网最全实战篇
说明
首先,感谢您抽出宝贵的时间来读我的文章。我知道您的时间很宝贵,而且有很多其他有趣的事情可以做。但是您还是选择了点进来,这让我感到非常欣慰。
在我前面的文章中有提到长连接选型,通过各种综合考虑后面我们选了gRpc,如今距离gRpc成功落地并上线已有几个月,在此期间也是遇到了一些坑,也翻遍了很多社区和文档,最终结合自己的一些思考,成功落地了一套gRpc服务端基于Java微服务、分布式、高可用的长连接服务。
关于标题
请允许我说为什么可能是全网最全实战篇,我并不是标题党,在许多关于gRpc的社区和文章中,多是偏基础使用入门的demo,比较难满足生产环境的使用需要,本文主要针对生产环境落地实践进行阐述,结合直播业务功能点对gRpc进行详细的使用说明。
系统、版本
gRpc服务端
基于Java,SpringCloud Alibaba/SpringBoot、Nacos 3.0(3.0版本的Nacos才支持gRpc协议,但如果你不需要在微服务的rpc调用中使用gRpc,则无需关心此项)
gRpc客户端
Java微服务、安卓、IOS
服务依赖、插件
gRpc、protobuf 依赖
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.51.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.51.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.51.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.4</version>
</dependency>
protobuf文件代码自动生成插件
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.34.1:exe:${os.detected.classifier}</pluginArtifact>
<!--设置grpc生成代码到指定路径-->
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<!--生成代码前是否清空目录-->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 设置多个源文件夹 -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- 添加主源码目录 -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/src/main/gen</source>
<source>${project.basedir}/src/main/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
gRpc Client SpringBoot集成依赖
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<version>2.14.0.RELEASE</version>
</dependency>
gRpc Server SpringBoot集成依赖
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>2.14.0.RELEASE</version>
</dependency>
proto文件接口定义、代码生成
咱们基于实际业务出发,先简单来个麦位管理接口练练手
syntax = "proto3";
import "google/protobuf/any.proto";
package com.xxx.grpc.room;
option java_multiple_files = true;
option java_package = "com.xxx.xxx.room";
option objc_class_prefix = "PB3Room";
//麦位接口
service Mic{
//获取房间麦位列表
rpc getMicList(GetMicListRequest) returns (GetMicListResponse) {};
//上麦
rpc micUp(MicUpRequest) returns (MicUpResponse) {};
//下麦
rpc micDown(MicDownRequest) returns (MicDownResponse) {};
//更改麦位状态
rpc micStateUpdate(MicStateUpdateRequest) returns (MicStateUpdateResponse) {};
//抱上麦位
rpc micHoldUp (MicHoldUpRequest) returns (MicHoldUpResponse) {};
//抱下麦位
rpc micHoldDown (MicHoldDownRequest) returns (MicHoldDownResponse) {};
//申请排麦
rpc micUpApply (MicUpApplyRequest) returns (MicUpApplyResponse) {};
//取消排麦
rpc micUpApplyCancel (MicUpApplyCancelRequest) returns (MicUpApplyCancelResponse) {};
}
//request----------------------------
message GetMicListRequest {
int64 roomId = 1; //房间ID
}
message MicUpRequest {
int64 roomId = 1; //房间ID
int32 position = 2; //麦位顺序
bool auto = 3; //是否需要自动上麦 true:需要 false:不需要 此参数为true时自动忽略position字段的值 为false时才会取position字段进行手动上麦
}
message MicDownRequest {
int64 roomId = 1; //房间ID
int32 position = 2; //麦位顺序
}
message MicStateUpdateRequest {
int64 roomId = 1; //房间ID
int32 position = 2; //麦位顺序
MicState state = 3; //状态
int64 userId = 4; //用户ID
}
message MicHoldUpRequest{
int64 roomId = 1; //房间ID
int64 userId = 2; //被操作的用户ID
int32 position = 3; //麦位 (有position时先判断position 没有判断userId所在position)
}
message MicHoldDownRequest{
int64 roomId = 1; //房间ID
int64 userId = 2; //被操作的用户ID
int32 position = 3; //麦位顺序
}
message MicUpApplyRequest{
int64 roomId = 1; //房间ID
}
message MicUpApplyCancelRequest{
int64 roomId = 1; //房间ID
}
// response----------------------------------------------------------
message GetMicListResponse{
repeated MicInfo micList = 1; //麦位列表
}
message MicUpResponse{
bool success = 1; //是否成功
}
message MicDownResponse{
bool success = 1; //是否成功
}
message MicStateUpdateResponse{
bool success = 1; //是否成功
int32 position = 2; //当前修改的麦位
}
message MicHoldUpResponse{
bool success = 1; //是否成功
}
message MicHoldDownResponse{
bool success = 1; //是否成功
}
message MicUpApplyResponse{
bool success = 1; //是否成功
}
message MicUpApplyCancelResponse{
bool success = 1; //是否成功
}
// object---------------------
//麦位信息
message MicInfo{
int64 userId = 1; //用户ID
int32 position = 2; //麦位
string nickname = 3;//昵称
string avatar = 4;// 头像
bool bossPosition = 5;//是否老板麦位
MicState state = 6; //麦位状态
int64 charmScore = 7; //在麦位上的魅力值
string avatarFrameUrl = 8; //用户佩戴头像框
}
//麦位状态
enum MicState{
OPEN = 0;//麦位麦克风正常打开状态
CLOSE = 1;//麦位麦克风被关闭状态
BAN = 2; //麦位被封禁
}
//异常状态码
enum ErrorCode{
//服务器异常
SERVER_ERROR = 0;
//非法操作
ILLEGAL_OPERATION = 1;
//有人在麦上
SOMEONE_IN_MIC = 1000;
//麦位被封禁
MIC_BAN = 1001;
//没有空闲麦位--麦下用户自动上麦时提示
NO_FREE_MIC = 1002;
//用户已经离开房间
NOT_IN_ROOM = 1003;
//无权限操作
NO_ACCESS = 1999;
}
找到proto文件所在项目-Lifecycle-complile双击自动生成代码
生成的代码如下,自动帮我们实现了序列化与反序列化的协议,同时它也是一种契约,所有的客户端+服务端都用着同一套契约
开始使用
@GrpcService注解作用
在基于 Spring Boot 的 gRPC 服务中,这个注解它的作用是将 gRPC 服务实现类标记为可被 gRPC 服务器扫描和注册的组件。
@GrpcService
注解的作用包括:
-
服务注册:通过将
@GrpcService
注解应用于 gRPC 服务实现类,将其自动注册到 gRPC 服务器中。这样,gRPC 服务器能够知道有哪些服务可用,并能够处理客户端的请求。 -
依赖注入:在基于 Spring Boot 的应用中,
@GrpcService
注解通常与依赖注入框架(如 Spring)一起使用。当使用@GrpcService
注解标记服务实现类时,依赖注入框架会自动扫描并创建该类的实例,并将其注入到 gRPC 服务器中。这使得你可以在服务实现类中使用其他依赖注入的组件或进行相关的业务逻辑。 -
简化配置:使用
@GrpcService
注解可以简化 gRPC 服务的配置。通过标记服务实现类,你无需手动编写繁琐的注册和配置代码,而是让注解和依赖注入框架处理这些细节,使得服务的开发更加方便和高效。
需要注意的是,@GrpcService
注解是针对基于 Spring Boot 的项目,并与 gRPC 框架集成时的一种常用做法。如果你使用其他的 gRPC 框架或平台,可能会有不同的方式来注册和管理 gRPC 服务实现类。
用户鉴权
我们在使用常规Http请求的的时候,通用的鉴权方式是通过HttpRequest拿到header中的token信息来对用户进行鉴权,在gRPC中并没有这种直接获取请求头部信息的方式,但是我们可以通过gRpc给我们提供的拦截器 ServerInterceptor
自己简单写一个类似的功能,代码如下。
public class MetadataConstants {
/**
* 令牌自定义标识
*/
public static final String AUTHENTICATION = "Authorization";
/**
* 客户端类型
*/
public static final String CLIENT_TYPE = "clientType";
}
public class ThreadLocalUtil {
private static final ThreadLocal<Map<String, Object>> threadLocal = ThreadLocal.withInitial(() -> new HashMap<>(10));
public static Map<String, Object> getThreadLocal() {
return threadLocal.get();
}
public static Object get(String key) {
Map<String, Object> map = threadLocal.get();
return map.get(key);
}
public static void set(String key, Object value) {
Map<String, Object> map = threadLocal.get();
map.put(key, value);
}
public static void set(Map<String, Object> keyValueMap) {
Map<String, Object> map = threadLocal.get();
map.putAll(keyValueMap);
}
public static void remove() {
threadLocal.remove();
}
public static <T> T remove(String key) {
Map<String, Object> map = threadLocal.get();
return (T) map.remove(key);
}
}
@Data
@Builder
public class GrpcThreadData {
/**
* 用户ID
*/
private Long userId;
/**
* 平台类型
*/
private int clientType;
}
public class GrpcUtil {
private static final String GRPC_USER_KEY = "grpc-user";
public static Long getUserId() {
GrpcThreadData grpcThreadData = (GrpcThreadData) ThreadLocalUtil.get(GRPC_USER_KEY);
return grpcThreadData.getUserId();
}
public static int getClientType() {
GrpcThreadData grpcThreadData = (GrpcThreadData) ThreadLocalUtil.get(GRPC_USER_KEY);
return grpcThreadData.getClientType();
}
}
public class CommonGrpcServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
String token = metadata.get(Metadata.Key.of(MetadataConstants.AUTHENTICATION, Metadata.ASCII_STRING_MARSHALLER)),
clientType = metadata.get(Metadata.Key.of(MetadataConstants.CLIENT_TYPE, Metadata.ASCII_STRING_MARSHALLER));
if (StringUtils.isBlank(token)) {
log.error("grpc请求异常,token为空.");
serverCall.close(Status.UNAUTHENTICATED, null);
}
Long userId = JWTUtil.getUserId(token);
if (userId ==null){
log.error("grpc请求异常,token验证失败.");
serverCall.close(Status.UNAUTHENTICATED, null);
}
log.info("grpc请求 | clientType:{} | userId:{} | methodName:{}", clientType, userId, serverCall.getMethodDescriptor().getFullMethodName());
ThreadLocalUtil.set(ThreadLocalConstant.GRPC_USER_KEY,
GrpcThreadData.builder()
.userId(userId)
.clientType(Integer.valueOf(clientType))
.build());
return serverCallHandler.startCall(serverCall, metadata);
}
}
通过拦截器,我们可以简单的对用户进行鉴权操作,并且每次调用我们的接口前将用户鉴权相关信息保存到 ThreadLocal
中,就可以在我们实际的gRpc接口中轻松获取用户鉴权信息了。
这里顺带再提一下 ServerInterceptor
的其他用法
-
认证和授权:
ServerInterceptor
可用于在服务端对请求进行认证和授权的处理。通过拦截请求,你可以验证客户端的身份并对其进行授权,以确保只有经过验证的客户端才能访问受保护的服务。 -
监控和日志:通过
ServerInterceptor
,你可以拦截请求和响应,以记录和监控服务端的性能指标、请求时间、错误日志等信息。这对于追踪问题、性能优化和日志记录非常有用。 -
转换和修改:
ServerInterceptor
允许你在请求到达服务端之前或响应返回客户端之前对消息进行转换和修改。这可以包括消息格式转换、请求参数校验、响应数据的加工处理等,以满足特定业务需求。 -
错误处理:通过
ServerInterceptor
,你可以捕获并处理服务端的异常和错误。这使得你能够自定义错误处理逻辑,例如返回特定的错误码、提供友好的错误信息等。 -
性能优化:通过
ServerInterceptor
,你可以实施缓存、请求合并、限流等性能优化策略,以提高服务端的响应效率和稳定性。
通过实现和注册自定义的 ServerInterceptor
,你可以对 gRPC 服务端进行灵活的定制和扩展。多个 ServerInterceptor
可以以链式的方式组合,形成拦截器的处理链,依次对请求进行处理。
客户端创建存根的方式
- 阻塞式存根(Blocking Stub): 阻塞式存根提供了同步的方法调用,即客户端在发送请求后会一直等待服务器的响应,并返回结果。在方法调用期间,客户端线程会被阻塞,直到服务器响应或超时。 阻塞式存根的方法一般以
blocking
或sync
开头,例如myMethod
。 - 异步存根(Async Stub): 异步存根提供了非阻塞的方法调用,客户端可以异步发送请求,然后通过回调机制或 Future 对象来获取响应。在异步方法调用中,客户端线程不会被阻塞,可以继续执行其他操作。 异步存根的方法一般以
future
、listen
、observe
等关键词开头,例如myMethodAsync
。 - 流存根(Streaming Stub): 流存根用于处理流式数据,支持客户端流、服务器流和双向流三种模式。客户端流表示客户端向服务器发送多个请求,服务器返回一个响应。服务器流表示服务器向客户端发送多个响应,客户端返回一个请求。双向流表示客户端和服务器之间同时发送多个请求和响应。 流存根的方法一般以
stream
、streaming
等关键词开头,例如myMethodStream
。
一元式使用
客户端一元式代码
public class TestClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 32001)
.usePlaintext()
.build();
// 创建一元式阻塞式存根
MicGrpc.MicBlockingStub blockingStub = MicGrpc.newBlockingStub(channel);
// 创建请求对象
GetMicListRequest request = GetMicListRequest.newBuilder()
.setRoomId(20012520L)
.build();
// 发送请求麦位列表信息并接收响应
GetMicListResponse response = blockingStub.getMicList(request);
// 处理麦位列表信息响应
System.out.println("Received mic list response: " + response);
}
}
服务端一元式代码
@GrpcService
@Slf4j
public class RoomMicGrpcService extends MicGrpc.MicImplBase {
//注入实际业务服务接口,不方便贴实现代码
@Autowired
private LiveRoomMicClient liveRoomMicClient;
@Override
public void getMicList(GetMicListRequest getMicListRequest, StreamObserver<GetMicListResponse> responseObserver) {
Long userId = GrpcUtil.getUserId(),
roomId = getMicListRequest.getRoomId();
log.info("用户获取麦位信息 | roomId:{} | userId:{}", roomId, userId);
//拿到 8个麦位里的信息
List<LiveRoomMicVo> list = liveRoomMicClient.getMicList(roomId, userId);
//返回数据给客户端
responseObserver.onNext(GrpcBuildMessageUtil.buildMicListResponse(list));
//告知客户端该 gRPC 调用已经完成
responseObserver.onCompleted();
}
}
这样子,一个简单的gRpc一元式接口就完成了。
双向流使用
这里我直接讲双向流,跳过了客户端流和服务端流的使用,因为双向流是它们两个的扩展,涉及的知识点也比较多,所以只要掌握了双向流,其他两个都不是什么大问题。
使用场景
在直播间内,有麦位信息更新(上下麦、魅力值变化时)需要对所有用户进行一个信息更新
客户端与服务端建立连接
proto文件增加一个双向流协议
service Rtmp{
rpc listener(stream RtmpMessage) returns (stream RtmpMessage) {};
}
//实时消息协议信息
message RtmpMessage{
int64 roomId = 1; //房间ID
MessageType messageType = 2; //消息类型
repeated MicInfo micList = 3; //麦位信息列表
enum MessageType{
//麦位消息,entity:MicInfo
MIC = 0;
}
}
这边以Java客户端代码为例,写一个Demo
// 创建与 gRPC 服务器的通道
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 32001)
.usePlaintext() // 在本示例中使用明文连接,生产环境请使用安全连接
.build();
//当前房间ID
Long roomId = 20012520L;
// 创建双向流 Stub
RtmpGrpc.RtmpStub stub = RtmpGrpc.newStub(channel);
// 创建双向流观察者
StreamObserver<RtmpMessage> requestObserver = stub.listener(new StreamObserver<RtmpMessage>() {
@Override
public void onNext(RtmpMessage response) {
// 处理从服务端接收到的响应消息
System.out.println("Received response: " + response.getRoomId());
}
@Override
public void onError(Throwable t) {
// 处理错误情况
System.err.println("Error occurred: " + t.getMessage());
}
@Override
public void onCompleted() {
// 处理流结束的完成状态
System.out.println("Stream completed");
}
});
// 发送消息到服务端
requestObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.MIC)
.setRoomId(roomId)
.build());
requestObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.MIC)
.setRoomId(roomId)
.build());
// 结束流通信
requestObserver.onCompleted();
// 关闭通道
channel.shutdown();
服务端维护客户端存根
@GrpcService
@Slf4j
public class RoomRtmpGrpcService extends RtmpGrpc.RtmpImplBase {
// 维护客户端存根
private static ConcurrentHashMap<Long, StreamObserver> userStreamMap = new ConcurrentHashMap<>();
@Override
public StreamObserver<RtmpMessage> listener(StreamObserver<RtmpMessage> streamObserver) {
// server => client
Long userId = GrpcUtil.getUserId();
putUserStream(streamObserver);
// client => server
return new StreamObserver<RtmpMessage>() {
@Override
public void onNext(RtmpMessage value) {
log.info("接收到客户端消息:rtmp message type:{} | roomId:{}", value.getMessageTypeValue(), value.getRoomId());
}
@Override
public void onError(Throwable t) {
log.info("grpc异常 | {}", t.getMessage());
}
@Override
public void onCompleted() {
log.info("客户端 rtmp 流使用完成");
StreamObserver<RtmpMessage> rtmpMessageStreamObserver = getUserStream(userId);
if (null != rtmpMessageStreamObserver) {
rtmpMessageStreamObserver.onCompleted();
}
removeUserStream(roomId, userId);
}
};
}
private void putUserStream(StreamObserver<RtmpMessage> streamObserver) {
Long userId = GrpcUtil.getUserId();
log.info("客户端打开双向流 | userId:{}", userId);
userStreamMap.put(userId, streamObserver);
}
private void removeUserStream(Long roomId, Long userId) {
userId = userId == null ? GrpcUtil.getUserId() : userId;
log.info("剔除客户端流 | roomId:{} | userId:{}", roomId, userId);
userStreamMap.remove(userId);
}
private StreamObserver<RtmpMessage> getUserStream(Long userId) {
return userStreamMap.get(userId == null ? GrpcUtil.getUserId() : userId);
}
}
如上所示,客户端的双向流存根我们就维护好了,接下来可以考虑集群环境下连接和推送消息的问题了。
集群模式下gRpc长连接的解决方案
考虑要点
-
负载均衡:负载均衡器应该能够维护和管理长连接,确保连接的均衡分布在后端服务上。负载均衡器应根据后端服务的负载情况和可用性,动态地选择和分配长连接。
-
心跳检测:为了保持长连接的健康状态,可以使用心跳机制进行定期的连接检测。心跳检测可以通过定时发送心跳消息并等待响应来确认连接是否仍然有效。如果连接出现异常或超时,则可以进行相应的重连或错误处理。
-
断线重连:在长连接的情况下,可能会出现网络中断、服务器故障等问题导致连接中断。在这种情况下,客户端和服务器应该能够进行断线重连,以恢复连接并继续通信。断线重连机制应该是自动的,并具有适当的重试策略和指数退避机制,以避免过度的重连尝试。
-
优雅关闭:当客户端或服务器需要关闭连接时,应该使用优雅关闭的方式来终止连接。优雅关闭可以确保正在进行的请求得到处理,并在关闭前完成。这可以通过发送特定的关闭信号或协议消息来触发。
-
配置调整:长连接的参数和配置可以根据实际需求进行调整。这包括连接超时时间、心跳间隔、连接池大小等。根据负载和性能需求,可以进行适当的调整和优化。
SpringBoot下gRpc长连接配置调整优化
grpc:
server:
port: 32001
#当启用保活时,gRPC 将周期性地发送心跳消息来检测连接的状态,并保持连接处于活动状态
enable-keep-alive: true
#表示当发送完ping packet后多久没收到client回应算超时
keep-alive-timeout: 1s
#表示当grpc连接没有数据传递时,多久之后开始向client发送ping packet
keep-alive-time: 10s
#是否允许客户端发送保持活跃的HTTP/2 ping,即使在连接上没有未完成的rpc。默认为false。
permit-keep-alive-without-calls: true
max-inbound-message-size: 10485760 # 10Mb 该参数控制服务端可以接收的最大消息大小。对于高并发场景,建议将其设置为较小的值,以减少网络延迟和内存占用。
客户端断线重连机制
- 定期检测性重连机制
/**
- 使用 `ManagedChannelBuilder` 设置 `usePlaintext()` 或 `useTransportSecurity()` 方法,以指定连接的安全性。
- 使用 `withWaitForConnected()` 方法设置等待重新连接的时间间隔。
- 使用 `withReconnectBackoff()` 方法设置重连的退避策略,例如指数退避。
- 通过调用 `build()` 方法创建 `ManagedChannel` 对象。
- 当与服务器的连接中断时,gRPC 客户端会自动尝试重新连接。
**/
ManagedChannel channel = ManagedChannelBuilder
.forAddress('localhost', 32001)
.usePlaintext()
.withWaitForConnected()
.withReconnectBackoff(...)
.build();
- 监听连接关闭重连机制
@Component
public class GRpcClientReconnect implements ApplicationListener<ApplicationReadyEvent> {
@Value("${grpc.server.host}")
private String serverHost;
@Value("${grpc.server.port}")
private int serverPort;
private ManagedChannel channel;
private RtmpGrpc.RtmpStub client;
@PostConstruct
private void initialize() {
channel = createChannel();
client = RtmpGrpc.newStub(channel);
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// 在应用程序启动后,可以使用 client 执行 gRPC 请求
}
private ManagedChannel createChannel() {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(serverHost, serverPort)
.usePlaintext()
.build();
// 添加断线重连的 ClientInterceptor
ClientInterceptor reconnectInterceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// 创建一个新的 Call 实例
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
// 包装 Call 实例,添加断线重连逻辑
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 创建一个新的监听器,以便处理连接中断和重新连接逻辑
Listener<RespT> reconnectingListener = new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
private boolean isReconnecting = false;
@Override
public void onClose(Status status, Metadata trailers) {
// 当连接关闭时,判断是否需要重连
if (status.getCode() == Status.Code.UNAVAILABLE && !isReconnecting) {
System.out.println("Connection lost. Reconnecting...");
isReconnecting = true;
// 实现自定义的重连逻辑,例如等待一段时间后重新创建 Channel
// 这里简单地等待 5 秒后重新创建 Channel
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ManagedChannel newChannel = createChannel();
ClientCall<ReqT, RespT> newCall = newChannel.newCall(method, callOptions);
} else {
super.onClose(status, trailers);
}
}
};
delegate().start(reconnectingListener, headers);
}
};
}
};
// 添加断线重连的 ClientInterceptor
channel = (ManagedChannel) ClientInterceptors.intercept(channel, reconnectInterceptor);
return channel;
}
}
长连接负载均衡
常规负载方案
客户端的长连接通过网关负载均衡可能走到不同的长连接服务器上,大致如图所示
此架构下解决的问题
- 负载均衡
- 长连接服务高可用
此架构下存在的问题
- 不确定用户长连接具体分散在哪一台机器,无法对用户进行点对点的消息推送
客户端拉服务路由表,Hash算法负载
针对以上问题我们可以根据用户ID进行hash算法计算出客户端需要路由到的长连接服务
此架构下解决的问题
- 负载均衡
- 确定用户具体分散机器,能进行点对点推送
此架构下存在的问题
- 长连接服务非高可用,重启、部分机器宕机会导致其下所有连接受影响
客户端拉本地集群路由表,Hash算法集群负载
针对以上两种方案的问题我们出个多集群模式的架构方案进行改进
此架构下解决的问题
- 负载均衡
- 长连接服务高可用
- 确定用户长连接具体分散集群机器,可进行精准点对点
- 解决了重启、部分机器宕机导致连接不可用的问题
存在问题
- 成本稍大
- 滚动发布时单个集群内至少需要保证有长连接服务是可用状态
双向流消息推送
点对点消息推送
为什么是广播消费呢?
用户长连接落在一块集群,当然是可以知道在哪一台机器,但因为MQ消息分发是基于主题和标签的发布/订阅模型,只有广播消费,我们才能在消费时去过滤消息中对应的用户ID是否在对应机器上有建立长连接,有的话推送消息出去。
代码
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class GrpcRoomRtmpMessageDto extends BaseMqMessage implements Serializable {
/**
* 房间ID
*/
private Long roomId;
/**
* 广播所有房间
*/
private boolean broadcastAllRoom;
/**
* 用户ID
*/
private Long userId;
/**
* 用户ID集合
*/
private Set<Long> userIdSet;
/**
* rtmp消息类型
*/
private int rtmpMessageType;
/**
* 推送类型
*/
private int pushType;
/**
* 数据
*/
private String data;
}
对应图上gRpc集群服务A中的服务下MQ消费监听
@Slf4j
@Service
@MqGrpcGroupConsumeEvent(event = RocketMqBizConstant.Grpc.Broadcast.ROOM\_RTMP\_MESSAGE)
public class RoomRtmpMqMessageHandle extends BaseGrpcGroupMqHandler {
@Override
public void handleMessage(String message) {
log.info("RTMP业务消息处理:{}", message);
GrpcRoomRtmpMessageDto grpcRoomRtmpMessageDto = GsonUtil.GsonToBean(message, GrpcRoomRtmpMessageDto.class);
int pushType = grpcRoomRtmpMessageDto.getPushType();
if (pushType == RtmpPushType.PPP.getType()) {
//点对点
if (rtmpMessageType == RtmpMessage.MessageType.ROOM_VALUE) {
//房间点信息对点
if (CollectionUtils.isEmpty(userIdSet)) {
roomRtmpGrpcService.sendRoomChangeMessage(roomId, userId);
} else {
userIdSet.forEach(o -> {
roomRtmpGrpcService.sendRoomChangeMessage(roomId, userId);
});
}
}
}
}
}
public void sendRoomChangeMessage(Long roomId, Long userId) {
StreamObserver streamObserver = userStreamMap.get(userId);
if (null == streamObserver) {
//如果此条消息不在此实例,直接过滤
return;
}
log.info("推送客户端房间变化消息 | roomId:{} | userId:{}", roomId, userId);
try {
streamObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.ROOM)
.build());
log.info("推送客户端房间变化消息成功 | roomId:{} | userId:{}", roomId, userId);
} catch (IllegalStateException illegalStateException) {
log.error("客户端长连接状态异常 | userId:{} | message", userId,illegalStateException.getMessage());
} catch (Exception e) {
log.error(String.format("推送客户端房间变化消息异常 | roomId:%s | userId:%s", roomId, userId), e);
}
}
广播消息推送
如图所示,假设图上的用户都在同一个房间,他们的长连接都在不同的集群、机器上,这个时候假设有一个用户在房间内上麦,需要向房间内所有用户推送麦位信息更新的广播。
代码
对应图上gRpc集群服务A中的服务下MQ消费监听
@Slf4j
@Service
@MqGrpcGroupConsumeEvent(event = RocketMqBizConstant.Grpc.Broadcast.ROOM_RTMP_MESSAGE)
public class RoomRtmpMqMessageHandle extends BaseGrpcGroupMqHandler {
@Override
public void handleMessage(String message) {
log.info("RTMP业务消息处理:{}", message);
GrpcRoomRtmpMessageDto grpcRoomRtmpMessageDto = GsonUtil.GsonToBean(message, GrpcRoomRtmpMessageDto.class);
int pushType = grpcRoomRtmpMessageDto.getPushType();
if (pushType == RtmpPushType.BROADCAST.getType()) {
//广播
if (rtmpMessageType == RtmpMessage.MessageType.MIC_VALUE) {
//麦位广播
roomRtmpGrpcService.broadcastRoom(roomId, broadcastAllRoom, userId, RtmpMessage.MessageType.MIC);
}
}
}
}
public void broadcastRoom(Long roomId, boolean broadcastAllRoom, Long currentUserId, Long sendUserId, RtmpMessage.MessageType messageType) {
Set<Long> roomUserSet;
if (broadcastAllRoom) {
log.info("全服房间广播");
roomUserSet = userStreamMap.keySet().stream().collect(Collectors.toSet());
} else {
log.info("房间广播 | roomId:{}", roomId);
//业务服务从redis获取房间内所有用户列表
roomUserSet = SetUtil.defaultSet(liveRoomUserClient.getLiveRoomUserList(roomId));
if (CollectionUtils.isEmpty(roomUserSet)) {
log.info("房间没人,不广播 | roomId:{}", roomId);
return;
}
}
if (null == messageType) {
return;
}
if (messageType.equals(RtmpMessage.MessageType.MIC)) {
//广播麦位变化
//业务服务获取全部麦位信息
List<LiveRoomMicVo> list = liveRoomMicClient.getMicList(roomId, currentUserId == null ? GrpcUtil.getUserId() : currentUserId);
GetMicListResponse getMicListResponse = GrpcBuildMessageUtil.buildMicListResponse(list);
roomUserSet.forEach(o -> sendMicChangeMessage(roomId, o, getMicListResponse));
}
}
public void sendMicChangeMessage(Long roomId, Long userId, GetMicListResponse getMicListResponse) {
StreamObserver streamObserver = userStreamMap.get(userId);
if (null == streamObserver) {
return;
}
log.info("推送客户端麦位变化消息 | roomId:{} | userId:{}", roomId, userId);
try {
streamObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.MIC).setRoomId(roomId)
.addAllMicList(getMicListResponse.getMicListList())
.build());
log.info("推送客户端麦位变化消息成功 | roomId:{} | userId:{}", roomId, userId);
} catch (IllegalStateException illegalStateException) {
log.error("客户端长连接状态异常 | userId:{} | message", userId,illegalStateException.getMessage());
} catch (Exception e) {
log.error(String.format("推送客户端麦位变化消息异常 | roomId:%s | userId:%s", roomId, userId), e);
}
}
推送消息优化
上面的推送消息代码只是一个简单的示例,如果放在生产环境,性能是不佳的,而且可能存在一定问题,主要优化点有以下几点
同一时间内对同一用户推送多条消息,消息没有顺序性
大家都对A-B-A非常熟知了,可能导致的问题就不用多说了,我这直接说常用的几种解决方案
-
序列号:在每条消息中添加一个序列号字段,客户端可以根据序列号来判断消息的顺序,并进行处理。服务端发送消息时,按照预定的顺序设置序列号。
-
延迟处理:客户端接收到多条消息后,可以将这些消息存储在缓冲区中,按照一定的规则进行排序和处理。例如,可以使用优先级队列或时间戳来确定消息的顺序,并逐个处理。
-
限制并发推送:服务端可以限制并发推送给同一用户的消息数量,确保只有一条消息被推送并处理,直到客户端完成当前消息的处理后再推送下一条消息。
-
引入应答机制:服务端在推送消息后,等待客户端的应答,确认上一条消息已经处理完毕,然后再推送下一条消息。这可以通过客户端发送一个确认消息的方式实现。
1,2两点很显然客户端处理就行,第4点没有那么必要,在高并发场景下,所有消息都引入应答机制那性能消耗太大,而且有些广播消息我们甚至都不关心是否需要100%送达,所以第4点一般只适合对推送消息送达可靠性需要保证的业务,我们这边服务端这边直接实践第3点 使用Sentinel限制并发推送
@SentinelResource(value = "sendMicChangeMessage", blockHandler = "handleBlock")
public void sendMicChangeMessage(Long roomId, Long userId, GetMicListResponse getMicListResponse) {
StreamObserver streamObserver = userStreamMap.get(userId);
if (null == streamObserver) {
return;
}
log.info("推送客户端麦位变化消息 | roomId:{} | userId:{}", roomId, userId);
try {
streamObserver.onNext(RtmpMessage.newBuilder().setMessageType(RtmpMessage.MessageType.MIC).setRoomId(roomId)
.addAllMicList(getMicListResponse.getMicListList())
.build());
log.info("推送客户端麦位变化消息成功 | roomId:{} | userId:{}", roomId, userId);
} catch (IllegalStateException illegalStateException) {
log.error("客户端长连接状态异常 | userId:{} | message", userId,illegalStateException.getMessage());
} catch (Exception e) {
log.error(String.format("推送客户端麦位变化消息异常 | roomId:%s | userId:%s", roomId, userId), e);
}
}
public String handleBlock(Long roomId, Long userId, GetMicListResponse getMicListResponse, BlockException ex) {
return "Blocked for user: " + userId;
}
@Component
public class SentinelConfig {
@PostConstruct
public void init() {
// 创建参数流控规则
List<ParamFlowRule> rules = new ArrayList<>();
// 创建参数并发规则
ParamFlowRule rule = new ParamFlowRule();
rule.setResource("sendMicChangeMessage"); // 资源名称,与业务接口的 @SentinelResource 注解中的 value 值对应
rule.setParamIdx(1); // 参数索引,0 表示第一个参数
rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // 限流模式为 QPS
rule.setCount(10); // 每秒最大请求数为 10
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); // 并发限制模式为 RateLimiter
rule.setMaxQueueingTimeMs(100); // 最大排队等待时间为 100 毫秒
rules.add(rule);
ParamFlowRuleManager.loadRules(rules);
}
}
ok,至此,整个gRpc服务1.0阶段的全链路架构、使用方式都已完毕,方便并不完美,敬请期待后续2.0版本的升级。
转载自:https://juejin.cn/post/7234543902400397373