likes
comments
collection
share

InheritableThreadLocal在ThreadPoolExecutor中无法传递参数

作者站长头像
站长
· 阅读数 5

前言

最近遇到一个线上问题,我们需要在线程池中去解析数据、入库、发送MQ消息。在入库和发送消息的过程中会有监控,需要输出traceId,但是在后续排查问题的过程中发现了很多不同的业务打印的traceId都是一样的,因为我们的traceId中包含了生成的日期,我竟然发现今天输出的traceId是之前生成的,不是今天的。这就很奇怪。但好在我们有日志上下文,通过分析堆栈信息发现在主线程生成traceId的时候是正确的,但是在线程池中处理任务的时候打印的traceId就是之前的。因为traceId是使用InheritableThreadLocal来进行传递的,理论上子线程是可以拿到父线程在ThreadLocal中设置的traceId的,但是只能在自己new Thread()这种方式创建出来的线程才不会有问题,在ThreadPoolExecutor中就不行!!!

问题复现

下面我将通过一个例子来复现这个过程。

  1. 创建一个用来保存traceId的RequestContext
public class RequestContext {

    // 将traceId存到InheritableThreadLocal中
    public static ThreadLocal<String> THREAD_CONTEXT_HOLDER = new InheritableThreadLocal<>();

    public static void set(String value) {
        THREAD_CONTEXT_HOLDER.set(value);
    }

    public static String get() {
        return THREAD_CONTEXT_HOLDER.get();
    }
}
  1. 创建一个任务类MyTask,用来模拟任务的执行
pulbic class MyTask implements Runnable {

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            String value = RequestContext.get();
            System.out.println("线程池获取到的值:" + value);
        }
    }

  1. 在主线程中传递
   public static void main(String[] args) throws InterruptedException {
        RequestContext.set("111111111");
        String value = RequestContext.get();
        System.out.println("主线程RequestContext获取到的值: " + value);

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));
        threadPoolExecutor.execute(new MyTask());
        RequestContext.set("222222222");
        threadPoolExecutor.execute(new MyTask());
   
   }

我们期望的输出结果应该是:

主线程RequestContext获取到的值: 111111111
线程池获取到的值:111111111
线程池获取到的值:222222222

但是实际的执行结果却是:

主线程RequestContext获取到的值: 111111111
线程池获取到的值:111111111
线程池获取到的值:111111111

问题分析

这个问题就有点奇怪,你要说InheritableThreadLocal没用吧,他还能获取到主线程设置的111111111,你要说他有用吧,主线程第二次设置的222222222他就拿不到了。这个问题的根源还是在于Thread对象。从InheritableThreadLocal.getMap()方法中可以看到,线程共享值是通过t.inheritableThreadLocals的方式获取的 InheritableThreadLocal在ThreadPoolExecutor中无法传递参数 在Thread对象内部会有两个属性threadLocalsinheritableThreadLocalsInheritableThreadLocal在ThreadPoolExecutor中无法传递参数 他们分别是用来给ThreadLocal和InheritableThreadLocal这两个对象来保存共享变量的。既然在子线程无法获取值,那么肯定跟inheritableThreadLocals这个变量有关,而这个变量只在new Thread的时候有赋值,找到那个最底层的构造器看看: InheritableThreadLocal在ThreadPoolExecutor中无法传递参数 在这里将父线程的inheritableThreadLocals传递给了当前创建出来的子线程。把它跟线程池联系在一起就能想明白,线程池里面的线程是复用的,第一次线程被new的时候,将父线程的inheritableThreadLocals传递给了子线程,在线程使用完之后并没有被回收,而是存到了线程池里面,也就是说此时这个线程的inheritableThreadLocals里面保存的只会是我们第一次设置的值,后面在父线程中在怎么设置也不会被传递到子线程。

解决方案

重写ThreadPoolExecutor

既然无法通过ThreadinheritableThreadLocals属性来传递数据,那么我们能不能自己去传递?父线程和子线程的交接点是任务,让父线程在创建任务的时候把所要传递的数据传给任务,那么在子线程执行任务的时候就可以拿到了。类似这样的代码:

RequestContext.set("111111111");
ThreadPoolExecutor threadPoolExecutor = new MyPool(1, 1, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));
threadPoolExecutor.execute(new MyTask(RequestContext.get()));

如要真要这么做的话会存在以下问题:

  1. 在MyTask对象里面维护一个traceId的属性,凭什么在创建我MyTask的时候还要依赖你的traceId呢?缺乏单一性!!!
  2. 很多任务都需要traceId,难道我很多任务都要维护一个traceId属性吗?缺乏封装性!!!

基于存在以上的两个问题,所以我又想到了一个更好的办法,大家都知道的一句话:

计算机科学中没有解决不了的问题,只要再增加一个间接层就可以了,除了过多的间接层的问题。

我们可以加一个中间层,把父线程的traceId拿出来存在子线程的ThreadLocal里,这样我们就不需要更改原有的MyTask类了。

  1. 创建中间层MyTaskWrapper
  • 将父线程中的值保存下来
  • 将我们真正执行任务的MyTask封装进去等待调用。
public class MyTaskWrapper implements Runnable {

    private final Runnable targetRunnable;

    // 获取到父线程中设置的traceId
    private final String traceId = RequestContext.get();

    public MyTaskWrapper(Runnable targetRunnable) {
        this.targetRunnable = targetRunnable;
    }

    @Override
    public void run() {
        // 将父线程获取到的traceId存到子线程的ThreadLocal里面
        if (traceId != null) {
            RequestContext.set(traceId);
        }
        // 执行真正的任务
        targetRunnable.run();
    }
}
  1. 再创建一个类ThreadPoolWrapper,继承ThreadPoolExecutor:
static class ThreadPoolWrapper extends ThreadPoolExecutor {

    public ThreadPoolWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(new MyTaskWrapper(command));
    }
}
  1. 调用方代码只需要稍稍做一点改动,把ThreadPoolExecutor换成ThreadPoolWrapper即可:

改动前: InheritableThreadLocal在ThreadPoolExecutor中无法传递参数

改动后: InheritableThreadLocal在ThreadPoolExecutor中无法传递参数 输出结果:

主线程RequestContext获取到的值: 111111111
线程池获取到的值:111111111
线程池获取到的值:222222222

TransmittableThreadLocal

类似的问题能让我遇到,说明肯定有人也遇到过。阿里开源了一款用来解决这个问题的框架:TransmittableThreadLocal。接下来看看如何使用:

  1. 引入依赖包:
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>transmittable-thread-local</artifactId>
  <version>2.14.2</version>
</dependency>
  1. 将线程池封装为ExecutorServiceTtlWrapper,再调用封装之后的线程池的execute()方法,由于ExecutorServiceTtlWrapper也实现了ExecutorService接口,所以我们可以像正常的线程池一样使用即可。
public static void main(String[] args) {

    RequestContext.set("1111111");
    System.out.println("主线程RequestContext获取到的值: " + RequestContext.get());
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
    ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(threadPoolExecutor);
    ttlExecutorService.execute(new MyTask());
    RequestContext.set("2222222");
    ttlExecutorService.execute(new MyTask());
}

运行结果:

主线程RequestContext获取到的值: 1111111
线程池获取到的值:1111111
线程池获取到的值:2222222

其实这种方式和第一种咱们自己写的方式思路差不多,都是多套了一层逻辑处理。具体的实现原理会在后面的文章中单独详解。

只有先改变自己的态度,才能改变人生的高度。