likes
comments
collection
share

Spring Boot集成@Async快速入门Demo

作者站长头像
站长
· 阅读数 28

1.什么是@Async?

当我们在使用SpringBoot进行开发的时候,可能会遇到一些执行异步任务的场景,如果每次执行这些异步任务都去新建一个异步线程来执行的话,那代码就太冗余了。幸好SpringBoot给我们提供了Async的注解,让我们能够很轻松地对这些异步任务进行执行。

失效条件

  1. 异步方法使用static修饰
  2. 调用方法和异步方法在同一个类中

2.代码工程

**

实验目标:验证@async异步任务

**

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>async</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

service

  • thenApply :

    处理上一阶段计算结果

  • thenCompose:

    整合两个计算结果

    package com.et.async.service;

    import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service;

    import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future;

    /**

    • @author liuhaihua

    • @version 1.0

    • @ClassName NotifyServiceimpl

    • @Description todo */ @Service @Slf4j public class NotifyService { public void noAsync() { log.info("Execute method asynchronously. " + Thread.currentThread().getName()); } @Async("threadPoolTaskExecutor") public void withAsync() { log.info("Execute method asynchronously. " + Thread.currentThread().getName()); } @Async("threadPoolTaskExecutor") public void mockerror() { int ss=12/0; } @Async public Future asyncMethodWithReturnType() { log.info("Execute method asynchronously - " + Thread.currentThread().getName()); try { Thread.sleep(5000); return new AsyncResult("hello world !!!!"); } catch (InterruptedException e) { e.printStackTrace(); } return null; } @Autowired private FirstAsyncService fisrtService; @Autowired private SecondAsyncService secondService;

      public CompletableFuture asyncMergeServicesResponse() throws InterruptedException { CompletableFuture fisrtServiceResponse = fisrtService.asyncGetData(); CompletableFuture secondServiceResponse = secondService.asyncGetData();

       // Merge responses from FirstAsyncService and SecondAsyncService
       return fisrtServiceResponse.thenCompose(fisrtServiceValue -> secondServiceResponse.thenApply(secondServiceValue -> fisrtServiceValue + secondServiceValue));
      

      } }

    package com.et.async.service;

    import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service;

    import java.util.concurrent.CompletableFuture;

    /**

    • @author liuhaihua
    • @version 1.0
    • @ClassName FirstAsyncService
    • @Description todo
    • @date 2024年05月10日 16:24 */ @Service @Slf4j public class FirstAsyncService { @Async public CompletableFuture asyncGetData() throws InterruptedException { log.info("Execute method asynchronously " + Thread.currentThread().getName()); Thread.sleep(4000); return new AsyncResult<>(super.getClass().getSimpleName() + " response !!! ").completable(); } }

    package com.et.async.service;

    import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service;

    import java.util.concurrent.CompletableFuture;

    /**

    • @author liuhaihua
    • @version 1.0
    • @ClassName SecondAsyncService
    • @Description todo
    • @date 2024年05月10日 16:24 */ @Service @Slf4j public class SecondAsyncService { @Async public CompletableFuture asyncGetData() throws InterruptedException { log.info("Execute method asynchronously " + Thread.currentThread() .getName()); Thread.sleep(4000); return new AsyncResult<>(super.getClass().getSimpleName() + " response !!! ").completable(); } }

config

一个@EnableAsync 注解启用

package com.et.async.config;

import com.et.async.exception.CustomAsyncExceptionHandler;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {

    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        return new ThreadPoolTaskExecutor();
    }

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

异常类

对于无返回值的异步任务,配置CustomAsyncExceptionHandler类,统一处理无法捕获的异常

package com.et.async.exception;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;

import java.lang.reflect.Method;

public class CustomAsyncExceptionHandler
  implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(
            Throwable throwable, Method method, Object... obj) {
 
        System.out.println("Exception message - " + throwable.getMessage());
        System.out.println("Method name - " + method.getName());
        for (Object param : obj) {
            System.out.println("Parameter value - " + param);
        }
    }
    
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

3.测试

测试async异步任务

@Test
public void execute() throws ExecutionException, InterruptedException {
    log.info("your method test Code");
    log.info("Invoking an asynchronous method. " + Thread.currentThread().getName());
    notifyService.noAsync();
    notifyService.withAsync();

}

测试带线程池的异步任务

@Async("threadPoolTaskExecutor")
public void mockerror() {
    int ss=12/0;
}

测试带返回值的异步方法

@Test
public void testAsyncAnnotationForMethodsWithReturnType()
        throws InterruptedException, ExecutionException {
    log.info("Invoking an asynchronous method. " + Thread.currentThread().getName());
    Future<String> future = notifyService.asyncMethodWithReturnType();

    while (true) {
        if (future.isDone()) {
            log.info("Result from asynchronous process - " + future.get());
            break;
        }
        log.info("Continue doing something else. ");
        Thread.sleep(1000);
    }
}

测试多个异步任务合并结果

@Test
public void testAsyncAnnotationForMergedServicesResponse() throws InterruptedException, ExecutionException {
   log.info("Invoking an asynchronous method. " + Thread.currentThread().getName());
    CompletableFuture<String> completableFuture = notifyService.asyncMergeServicesResponse();

    while (true) {
        if (completableFuture.isDone()) {
           log.info("Result from asynchronous process - " + completableFuture.get());
            break;
        }
        log.info("Continue doing something else. ");
        Thread.sleep(1000);
    }
}

测试void方法异常捕获

@Test
public void mockerror() throws ExecutionException, InterruptedException {
    notifyService.mockerror();
}

4.引用参考

转载自:https://juejin.cn/post/7367923380345896997
评论
请登录