likes
comments
collection
share

【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

作者站长头像
站长
· 阅读数 58

背景

运营反馈,有用户在直播间内发送公屏,A用户发送的公屏 结果直播间内所有人员都显示了B用户发送的,而实际却是A用户发的。

原因分析

日志排查

  1. 询问运营要到问题截图

【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

  1. 定位公屏内容和出现问题的时间点 【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

3.查看log打印代码位置

【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

好家伙,发现服务端没有没错,实际发消息的用户ID和大家收到公屏的发送人不一样? 又让客户端排查了下日志,发现客户端A发送的,实际到了服务端从Token中获取到的当前发送公屏用户竟然是B?

代码分析

gRpc请求拦截器代码

@Slf4j(topic = SLSTopicType.TOPIC_GRPC)
public class CommonGrpcServerInterceptor implements ServerInterceptor {


    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
                                                                 ServerCallHandler<ReqT, RespT> serverCallHandler) {
        String token = metadata.get(Metadata.Key.of(MetadataConstants.AUTHENTICATION, Metadata.ASCII_STRING_MARSHALLER)),
                clientType = metadata.get(Metadata.Key.of(MetadataConstants.CLIENT_TYPE, Metadata.ASCII_STRING_MARSHALLER));

        if (StringUtils.isBlank(token)) {
            log.error("grpc请求异常,token为空.");
            serverCall.close(Status.UNAUTHENTICATED, null);
        }


        Long userId = JWTUtil.getUserId(token);

        log.info("grpc | token:{} | methodName:{} | userId:{}", token, serverCall.getMethodDescriptor().getFullMethodName(),
                userId);


        try {
            ThreadLocalUtil.set(ThreadLocalConstant.GRPC_USER_KEY,
                    GrpcThreadData.builder()
                            .userId(userId)
                            .clientType(Integer.valueOf(clientType))
                            .build());
            return serverCallHandler.startCall(serverCall, metadata);
        } finally {
            ThreadLocalUtil.remove(ThreadLocalConstant.GRPC_USER_KEY);
        }

    }
}

gRpc发送公屏消息接口

@Override
public void sendMessage(SendBarrageMessageRequest request, StreamObserver<SendBarrageMessageResponse> responseStreamObserver) {

    Long roomId = request.getRoomId(), currentUserId = GrpcUtil.getUserId();
    int type = request.getType().getNumber(), barrageType = BarrageInfo.MessageType.TEXT_VALUE;
    String content = request.getContent();

    log.info("用户发送房间公屏消息 | roomId:{} | currentUserId:{} | content:{} | type {}", roomId, currentUserId, content, type);

    rocketMqTemplate.send(RocketMqBizConstant.User.Cluster.BARRAGE_RECORD_SAVE_MSG, dto);
    String filterContent = content;
    if (type != SendBarrageMessageRequest.Type.EMOJI_VALUE) {
        if (SensitiveWordUtil.WORD_FILTER.include(StringUtils.deleteWhitespace(filterContent))) {
            filterContent = SensitiveWordUtil.WORD_FILTER.replace(StringUtils.deleteWhitespace(content));
        }
    }
    //用户发送公屏
    rocketMqTemplate.send(RocketMqBizConstant.Grpc.Broadcast.ROOM_RTMP_MESSAGE, GrpcRoomRtmpMessageDto.builder()
            .roomId(roomId)
            .pushType(RtmpPushType.BROADCAST.getType())
            .rtmpMessageType(RtmpMessage.MessageType.BARRAGE_VALUE)
            .userId(currentUserId)
            .sendUserId(currentUserId)
            .data(GsonUtil.GsonString(LiveRoomSendBarrageDto.builder()
                    .barrageType(barrageType)
                    .content(filterContent)
                    .build()))
            .build());

    responseStreamObserver.onNext(SendBarrageMessageResponse.newBuilder()
            .setSuccess(true)
            .setContent(filterContent)
            .build());

    responseStreamObserver.onCompleted();

}

每次gRpc请求进来前,在拦截其中解析token并将对应的用户ID设置到 ThreadLocal中,这样从接口中可以直接从ThreadLocal的值中获取请求的用户ID,看着好像没啥问题,难道是ThreadLocal设置的时候有线程安全问题?

来看一下ThreadLocal设置中的代码

public class ThreadLocalUtil {

    private static final ThreadLocal<Map<String, Object>> threadLocal = ThreadLocal.withInitial(() -> new HashMap<>(10));

    public static Map<String, Object> getThreadLocal() {
        return threadLocal.get();
    }

    public static Object get(String key) {
        Map<String, Object> map = threadLocal.get();
        return map.get(key);
    }

    public static void set(String key, Object value) {
        Map<String, Object> map = threadLocal.get();
        map.put(key, value);
    }

    public static void set(Map<String, Object> keyValueMap) {
        Map<String, Object> map = threadLocal.get();
        map.putAll(keyValueMap);
    }

    public static void remove() {
        threadLocal.remove();
    }

    public static <T> T remove(String key) {
        Map<String, Object> map = threadLocal.get();
        return (T) map.remove(key);
    }

}

ThreadLocal.withInitial(() -> new HashMap<>(10)); 方法可以为每个线程提供独立的初始值,确保每个线程都具有自己的初始状态,看起来是线程安全的,没啥毛病,那就排除了ThreadLocal的问题

继续排查日志-日志上下文分析

【从0-1 千万级直播项目实战】线上拦截器中使用ThreadLocal失效问题排查

好家伙,恍然大悟,拦截器中与gRpc接口代码执行的竟然不是同一个线程,大概率是gRpc框架的线程池处理线程不够用时使用线程切换造成,并发不大的时候还真看不出来~ 看到这的时候我已经打自己两巴掌了,因为在我看来这好像算是一个低级的错误...

使用ThreadLocal和gRpc框架提供Context的区别点

有请CHAT-GPT给我们解释一下两者的区别吧:

Context.current().withValue(key, value)ThreadLocalset 方法都可以用于在请求处理链路中传递数据,但它们之间有一些关键的区别:

  1. 作用范围:Context 是 gRPC 框架提供的上下文传递机制,可以在整个 gRPC 请求处理链路中传递数据。而 ThreadLocal 是基于线程的局部变量,只在同一线程中有效。

  2. 线程切换:当请求在 gRPC 框架中进行处理时,可能会涉及到线程的切换。在这种情况下,ThreadLocal 中的值无法在不同的线程之间共享,而 Context 可以跨线程传递数据。

  3. 显式传递:使用 Context 时,您需要在代码中显式地传递上下文对象。在 gRPC 请求处理链路中,通过调用 Context.current().withValue(key, value) 方法,您可以将数据与当前的上下文关联起来,并在后续的方法调用中获取该数据。

  4. 线程安全性:Context 在设计上是线程安全的,可以在多个线程之间共享数据。它是为了支持并发环境而设计的,可以在异步请求和并发请求处理中安全地传递数据。

总的来说,ContextThreadLocal 相比,更适用于 gRPC 框架中的请求处理链路,可以在不同的线程和方法调用中传递数据,提供更灵活的上下文传递机制。而 ThreadLocal 则更适用于在单个线程内部共享数据。

使用 Context 时需要注意上述区别,并根据需求选择合适的机制来传递数据。如果您需要在整个 gRPC 请求处理链路中传递数据,并且能够跨线程共享,那么使用 Context 是一个更好的选择。

总结

  1. ThreadLocal不要轻易使用在任意的拦截器中,除非你能保证拦截器的代码与实际执行的业务代码严格在同一个线程里,而不是使用线程池执行,否则在执行线程不够用时线程切换导致ThreadLocal读取失效
  2. 使用新组件时尽量使用组件提供的解决方案,这需要在使用前认真阅读文档,必要时多读读组件源码加深理解。
转载自:https://juejin.cn/post/7253497745629151290
评论
请登录