likes
comments
collection

Spring-Cloud如何异步跨线程查询链路日志(附实例)

作者站长头像
站长
· 阅读数 26

概述

由于分布式系统节点众多,排查错误日志要涉及到多个节点,如果在多个节点中没有唯一的请求id来把各个节点的请求日志串联起来,那么查询起来就会耗时耗力,因此Spring Sleuth出现了(Spring Sleuth基于Google dapper论文实现,详细了解可以查看此论文),Sleuth会在接收请求的入口通过Filter生成唯一的标识TraceId,这个TraceId会一直跟随请求路径传递到各个节点,只要有日志输出就会把TraceId的值打印出来,如下图(正常还会生成SpanId,为了便于理解没展现)

Spring-Cloud如何异步跨线程查询链路日志(附实例)

假如线上发生问题,要排查日志,那么根据这个TraceId,就能够快速查询到各个节点对应的请求日志,但是唯一的遗憾是异步执行会丢失TraceId,因此这里介绍异步跨线程下如何保证TraceId不丢失的问题

我们在官方文档中找到了异步传递Traceid说明,如下图

Spring-Cloud如何异步跨线程查询链路日志(附实例)

大致意思Sleuth默认支持@Async传递TraceId,并且支持spring.sleuth.async.enabled进行控制,同时提供了

  • LazyTraceExecutor
  • TraceableExecutorService
  • TraceableScheduledExecutorService

线程包装类,来支持跨线程传递TraceId,其中TraceableScheduledExecutorService是ScheduledExecutorService类的实现,用于实现定时任务触发,个人觉得这种需求不是特别多,所以只介绍常用的一些配置,比如@Async配置、线程池配置、EventBus配置,具体查看后续章节

Asnc配置

默认Sleuth是支持@Async注解异步传递TraceId的,但是如果自定义线程池,配置不对的情况可能就会导致失效,因为Spring在这快有个bug,详细了解请查看以下链接:

github.com/spring-proj…

github.com/spring-proj…

所以正确配置方法有如下3种

配置方法

方式1(推荐)

这里用到了Sleuth的LazyTraceExecutor包装了线程池,这样可以保证trace对象传到下一个线程中

@Configuration
@EnableAsync
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SpringAsyncConfig extends AsyncConfigurerSupport {

    @Autowired
    private BeanFactory beanFactory;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("AsyncExecutor-");
        executor.initialize();
        return new LazyTraceExecutor(this.beanFactory, executor);
    }

}

方式2

Sleuth初始化时会默认查找TaskExecutor作为Async的线程池,如果查找不到会获取默认的线程池

@EnableAsync
@Configuration
public class WebConfig {
    @Bean
    public TaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("AsyncExecutor-");
        executor.initialize();
        return executor;
    }
}

方式3

如果默认不配置任何线程池,只在工程中加了@EnableAsync 注解,那么Sleuth会使用自带的线程池SimpleAsyncTaskExecutor,这个线程池每次调用都会创建新线程,如果调用量比较多,创建的线程也会非常多,我们知道系统资源是有限的,如果线程数过多,会导致程序内存吃紧,从而导致OOM,所以不推荐使用这种方式

测试验证

测试代码

Async配置

@Configuration
@EnableAsync
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SpringAsyncConfig extends AsyncConfigurerSupport {

    @Autowired
    private BeanFactory beanFactory;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("AsyncExecutor-");
        executor.initialize();
        return new LazyTraceExecutor(this.beanFactory, executor);
    }

}

Service

@Service
@Slf4j
public class TestService {
@Async
public void printAsyncLog() {
    log.info("async log.....");
}

}

Controller

@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
    @Autowired
    private TestService testService;
    
   @RequestMapping(value = "/print/log", method = RequestMethod.GET)
   public String printLog() {
    log.info("sync log.....1.....");
    testService.printAsyncLog();
    log.info("sync log.....2.....");
    return "success";
  }
}

请求测试

执行请求test/async/print/log,输出以下信息,可以看到TraceId一样,只有Spanid发生了变化,线程名称前缀AsyncExecutor与设置前缀相同

19:44:54.818, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO  [] com.example.elkdemo.web.AsyncTestWeb printLog:30 - sync log.....1..... 
19:44:54.819, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO  [] com.example.elkdemo.web.AsyncTestWeb printLog:32 - sync log.....2..... 
19:44:54.819, [fae1c9449e12695f 2d51edbb45896bd8] [AsyncExecutor-2] INFO  [] c.e.elkdemo.service.TestService printAsyncLog:50 - async log..... 

线程池配置

线程池执行是通过TraceableExecutorService包装了ExecutorService,而且在初始化的时候需要注入进去BeanFactory对象,所以线程池作为全局变量和局部变量配置稍有不同,注意下面线程池设置只是示例代码,实际运用中可以根据需求自行修改

全局变量配置

构造函数初始化(推荐)

@Service
@Slf4j
public class TestService{
final BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;

public TestService(BeanFactory beanFactory1) {
    this.beanFactory = beanFactory1;
    this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}

/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog() {
    traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}

}

单例初始化

@Service
@Slf4j
public class TestService {

@Autowired
private BeanFactory beanFactory;

volatile TraceableExecutorService traceableExecutorService;

public TraceableExecutorService getTraceableExecutorService() {
    if (traceableExecutorService == null) {
        synchronized (TraceableExecutorService.class) {
            if (traceableExecutorService == null) {
                traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
            }
        }
    }
    return traceableExecutorService;
}

/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog() {
    TraceableExecutorService executorService = getTraceableExecutorService();
    executorService.execute(() -> log.info("async thread pool log....."));
}

}

通过InitializingBean的afterPropertiesSet进行初始化

@Service
@Slf4j
public class TestService implements InitializingBean {

@Autowired
private BeanFactory beanFactory;

private TraceableExecutorService traceableExecutorService;

@Override
public void afterPropertiesSet() {
    traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}

/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog() {
    traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}

}

局部变量配置

/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog2() {
    TraceableExecutorService  executorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
    executorService.execute(() -> log.info("async thread pool log....."));
}

测试验证

这里采用全局变量配置方式测试

测试代码

Controller

@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
    @Autowired
    private TestService testService;

    @RequestMapping(value = "/print/threadPool/log", method = RequestMethod.GET)
    public String printThreadPoolLog() {
        log.info("sync log.....1.....");
        testService.printThreadPoolLog();
        log.info("sync log.....2.....");
        return "success";
    }
}

Service

service采用构造函数方式进行初始化

@Service
@Slf4j
public class TestService{
final BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;

public TestService(BeanFactory beanFactory1) {
    this.beanFactory = beanFactory1;
    this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
 * 异步输出线程池日志
 */
public void printThreadPoolLog() {
    traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}

}

请求测试

执行请求/test/async/print/threadPool/log,输出以下信息,可以看到Traceid一样,只有Spanid发生了变化

19:35:13.799, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO  [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:38 - sync log.....1..... 
19:35:13.801, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO  [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:40 - sync log.....2..... 
19:35:13.801, [884212fb58c658c5 70008b8d3a97602d] [pool-4-thread-2] INFO  [] c.e.elkdemo.service.TestService lambda$printThreadPoolLog$0:37 - async thread pool log..... 

EventBus配置

EventBus配置与线程池配置类似,把TraceableExecutorService注入到AsyncEventBus中即可,因TraceableExecutorService类引用了BeanFactory实例,所以比原生方式复杂了一点,以下只介绍构造函数的初始化方式,其他初始化方式与线程池配置类似,所以这里就不再举例说明

构造函数进行初始化

@Component
@Slf4j
public class PushEventBus {

    private EventBus eventBus;

    public PushEventBus(BeanFactory beanFactory) {
        Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
        this.eventBus = new AsyncEventBus(traceableExecutorService);
    }

    public void register(Object obj) {
        eventBus.register(obj);
    }

    public void post(Object obj) {
        eventBus.post(obj);
    }

}

测试验证

测试代码

EventBus

@Component
@Slf4j
public class PushEventBus {

    private EventBus eventBus;

    public PushEventBus(BeanFactory beanFactory) {
        Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
        this.eventBus = new AsyncEventBus(traceableExecutorService);
    }

    public void register(Object obj) {
        eventBus.register(obj);
    }

    public void post(Object obj) {
        eventBus.post(obj);
    }

}

监听类

@Slf4j
public class EventListener {
  /**
   * 监听 Integer 类型的消息
   */
  @Subscribe
  public void listenInteger(Integer param) {
    log.info("EventListener#listenInteger->{}",param);
  }

  /**
   * 监听 String 类型的消息
   */
  @Subscribe
  public void listenString(String param) {
    log.info("EventListener#listenString->{}",param);
  }
}

controller

@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {

@Autowired
private PushEventBus pushEventBus;

@RequestMapping(value = "/print/guava/log", method = RequestMethod.GET)
public String printGuavaLog() {
    pushEventBus.register(new EventListener());
    log.info("sync log.....1.....");
    pushEventBus.post("11");
    log.info("sync log.....2.....");
    return "success";
}
}

请求测试

执行请求/test/async/print/guava/log,输出以下信息,可以看到Traceid一样,只有Spanid发生了变化

19:27:44.234, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO  [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:48 - sync log.....1..... 
19:27:44.236, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO  [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:50 - sync log.....2..... 
19:27:44.236, [50844e0d3909868c 702bf55c84873f17] [pool-3-thread-1] INFO  [] c.e.elkdemo.service.EventListener listenString:21 - EventListener#listenString->11 

作者公众号《架构成长指南》欢迎关注!