InheritableThreadLocal在ThreadPoolExecutor中无法传递参数
前言
最近遇到一个线上问题,我们需要在线程池中去解析数据、入库、发送MQ消息。在入库和发送消息的过程中会有监控,需要输出traceId,但是在后续排查问题的过程中发现了很多不同的业务打印的traceId都是一样的,因为我们的traceId中包含了生成的日期,我竟然发现今天输出的traceId是之前生成的,不是今天的。这就很奇怪。但好在我们有日志上下文,通过分析堆栈信息发现在主线程生成traceId的时候是正确的,但是在线程池中处理任务的时候打印的traceId就是之前的。因为traceId是使用InheritableThreadLocal
来进行传递的,理论上子线程是可以拿到父线程在ThreadLocal中设置的traceId的,但是只能在自己new Thread()
这种方式创建出来的线程才不会有问题,在ThreadPoolExecutor中就不行!!!
问题复现
下面我将通过一个例子来复现这个过程。
- 创建一个用来保存traceId的
RequestContext
类
public class RequestContext {
// 将traceId存到InheritableThreadLocal中
public static ThreadLocal<String> THREAD_CONTEXT_HOLDER = new InheritableThreadLocal<>();
public static void set(String value) {
THREAD_CONTEXT_HOLDER.set(value);
}
public static String get() {
return THREAD_CONTEXT_HOLDER.get();
}
}
- 创建一个任务类
MyTask
,用来模拟任务的执行
pulbic class MyTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String value = RequestContext.get();
System.out.println("线程池获取到的值:" + value);
}
}
- 在主线程中传递
public static void main(String[] args) throws InterruptedException {
RequestContext.set("111111111");
String value = RequestContext.get();
System.out.println("主线程RequestContext获取到的值: " + value);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));
threadPoolExecutor.execute(new MyTask());
RequestContext.set("222222222");
threadPoolExecutor.execute(new MyTask());
}
我们期望的输出结果应该是:
主线程RequestContext获取到的值: 111111111
线程池获取到的值:111111111
线程池获取到的值:222222222
但是实际的执行结果却是:
主线程RequestContext获取到的值: 111111111
线程池获取到的值:111111111
线程池获取到的值:111111111
问题分析
这个问题就有点奇怪,你要说InheritableThreadLocal
没用吧,他还能获取到主线程设置的111111111,你要说他有用吧,主线程第二次设置的222222222他就拿不到了。这个问题的根源还是在于Thread
对象。从InheritableThreadLocal.getMap()方法中可以看到,线程共享值是通过t.inheritableThreadLocals
的方式获取的
在Thread对象内部会有两个属性
threadLocals
和inheritableThreadLocals
:
他们分别是用来给ThreadLocal和InheritableThreadLocal这两个对象来保存共享变量的。既然在子线程无法获取值,那么肯定跟
inheritableThreadLocals
这个变量有关,而这个变量只在new Thread
的时候有赋值,找到那个最底层的构造器看看:
在这里将父线程的
inheritableThreadLocals
传递给了当前创建出来的子线程。把它跟线程池联系在一起就能想明白,线程池里面的线程是复用的,第一次线程被new的时候,将父线程的inheritableThreadLocals
传递给了子线程,在线程使用完之后并没有被回收,而是存到了线程池里面,也就是说此时这个线程的inheritableThreadLocals
里面保存的只会是我们第一次设置的值,后面在父线程中在怎么设置也不会被传递到子线程。
解决方案
重写ThreadPoolExecutor
既然无法通过Thread
的inheritableThreadLocals
属性来传递数据,那么我们能不能自己去传递?父线程和子线程的交接点是任务,让父线程在创建任务的时候把所要传递的数据传给任务,那么在子线程执行任务的时候就可以拿到了。类似这样的代码:
RequestContext.set("111111111");
ThreadPoolExecutor threadPoolExecutor = new MyPool(1, 1, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100));
threadPoolExecutor.execute(new MyTask(RequestContext.get()));
如要真要这么做的话会存在以下问题:
- 在MyTask对象里面维护一个traceId的属性,凭什么在创建我MyTask的时候还要依赖你的traceId呢?缺乏单一性!!!
- 很多任务都需要traceId,难道我很多任务都要维护一个traceId属性吗?缺乏封装性!!!
基于存在以上的两个问题,所以我又想到了一个更好的办法,大家都知道的一句话:
计算机科学中没有解决不了的问题,只要再增加一个间接层就可以了,除了过多的间接层的问题。
我们可以加一个中间层,把父线程的traceId拿出来存在子线程的ThreadLocal里,这样我们就不需要更改原有的MyTask类了。
- 创建中间层
MyTaskWrapper
类
- 将父线程中的值保存下来
- 将我们真正执行任务的MyTask封装进去等待调用。
public class MyTaskWrapper implements Runnable {
private final Runnable targetRunnable;
// 获取到父线程中设置的traceId
private final String traceId = RequestContext.get();
public MyTaskWrapper(Runnable targetRunnable) {
this.targetRunnable = targetRunnable;
}
@Override
public void run() {
// 将父线程获取到的traceId存到子线程的ThreadLocal里面
if (traceId != null) {
RequestContext.set(traceId);
}
// 执行真正的任务
targetRunnable.run();
}
}
- 再创建一个类
ThreadPoolWrapper
,继承ThreadPoolExecutor
:
static class ThreadPoolWrapper extends ThreadPoolExecutor {
public ThreadPoolWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
super.execute(new MyTaskWrapper(command));
}
}
- 调用方代码只需要稍稍做一点改动,把
ThreadPoolExecutor
换成ThreadPoolWrapper
即可:
改动前:
改动后:
输出结果:
主线程RequestContext获取到的值: 111111111
线程池获取到的值:111111111
线程池获取到的值:222222222
TransmittableThreadLocal
类似的问题能让我遇到,说明肯定有人也遇到过。阿里开源了一款用来解决这个问题的框架:TransmittableThreadLocal
。接下来看看如何使用:
- 引入依赖包:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.14.2</version>
</dependency>
- 将线程池封装为
ExecutorServiceTtlWrapper
,再调用封装之后的线程池的execute()方法,由于ExecutorServiceTtlWrapper也实现了ExecutorService接口,所以我们可以像正常的线程池一样使用即可。
public static void main(String[] args) {
RequestContext.set("1111111");
System.out.println("主线程RequestContext获取到的值: " + RequestContext.get());
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(threadPoolExecutor);
ttlExecutorService.execute(new MyTask());
RequestContext.set("2222222");
ttlExecutorService.execute(new MyTask());
}
运行结果:
主线程RequestContext获取到的值: 1111111
线程池获取到的值:1111111
线程池获取到的值:2222222
其实这种方式和第一种咱们自己写的方式思路差不多,都是多套了一层逻辑处理。具体的实现原理会在后面的文章中单独详解。
只有先改变自己的态度,才能改变人生的高度。
转载自:https://juejin.cn/post/7337854138515390490