likes
comments
collection
share

掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService

作者站长头像
站长
· 阅读数 28

引言

并发编程一直是Java编程中最具挑战性的领域之一,它要求开发者对线程的创建、管理和同步有深刻的理解。自Java 5起,java.util.concurrent包引入了一系列并发工具类,大大简化了并发编程的复杂性。在这些工具中,Executor接口及其相关的Executors工具类和ExecutorService接口是处理线程池和任务执行的基础。它们抽象了线程的创建和管理,允许开发者将精力集中在任务的逻辑实现上,而不是线程的生命周期管理。本文将深入了解这些基本的任务执行框架,并展示如何在Java应用程序中有效地使用它们。

正文

原理

在Java中,java.util.concurrent包提供了一套强大的并发工具,其中包括了任务执行框架,它由Executor接口、Executors工具类和ExecutorService接口组成。以下是对这些组件的基本介绍:

  1. Executor接口: Executor是一个简单的接口,用于抽象任务的提交过程。它仅有一个execute(Runnable command)方法,用于接收一个Runnable对象并执行它。 使用Executor可以将任务提交和执行的细节分离开来,这样可以更灵活地管理线程的使用。

掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService 2. Executors工具类: Executors类提供了一系列静态工厂方法来创建不同类型的ExecutorService。这些方法可以快速创建预定义配置的线程池,例如:

  • Executors.newFixedThreadPool(int nThreads):创建一个固定大小的线程池。 掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService
  • Executors.newCachedThreadPool():创建一个可缓存线程的线程池。适用于有许多短生命周期的异步任务的场景。

掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService

  • Executors.newSingleThreadExecutor():创建一个单线程的Executor,确保所有任务按照提交顺序串行执行。

掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorServicenewScheduledThreadPool(int corePoolSize): 创建一个固定线程数的线程池,并支持延迟或定期执行任务。适用于需要定时或周期性执行任务的场景。

掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService

掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService

  1. ExecutorService接口: ExecutorServiceExecutor的子接口,它提供了更完整的异步任务执行机制。除了execute方法外,它还提供了submit方法,可以提交实现了Callable接口的任务,并返回一个Future对象,通过这个对象可以获取异步执行的结果。

    ExecutorService还提供了管理和控制线程池的方法,例如:

    • shutdown():平滑地关闭ExecutorService,不再接受新任务,已提交的任务继续执行。
    • shutdownNow():尝试立即关闭ExecutorService,尝试停止所有正在执行的任务,不再处理队列中等待的任务,并返回等待执行的任务列表。
    • awaitTermination(long timeout, TimeUnit unit):阻塞当前线程直到ExecutorService中所有任务执行完毕,或者超时,或者当前线程被中断。

优缺点:

优点: - 简化资源管理:自动线程管理减少了创建和销毁线程的开销,提高了资源利用率。 - 提高响应性:可以通过线程池快速响应任务请求,尤其是在使用缓存线程池时。 - 增强灵活性:可以根据应用需求选择不同类型的线程池和任务执行策略。 - 任务调度:提供了一种机制来排队和调度任务,使得任务执行更加有序。

缺点: - 复杂性:对于初学者来说,合理配置和使用线程池可能会比较复杂。 - 资源消耗:如果不当使用或配置线程池,可能会导致资源浪费或性能瓶颈。 - 隐蔽的问题:错误的线程池使用可能导致难以发现的问题,如线程泄漏、任务拒绝等。

示例

一个简单的例子使用ExecutorService来提交任务:

package com.dereksmart.crawling.excutor;
import java.util.concurrent.*;
/**
 * @Author derek_smart
 * @Date 2024/7/30 7:55
 * @Description  Executor测试类
 */
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 提交Runnable任务
        executor.execute(new Runnable() {
            public void run() {
                System.out.println("Derek task....");
            }
        });

        // 提交Callable任务并获取Future对象
        Future<String> future = executor.submit(() -> {
            Thread.sleep(1000);
            return "Hello Derek....";
        });
        while (!future.isDone()) {
            try {
                // 可以选择性地暂停一下,避免密集循环
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        // 检查Future对象的状态和结果
        try {
            // 任务完成,获取结果
            String result = future.get();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 关闭ExecutorService
        executor.shutdown();
    }
}

掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService

在这个例子中,创建了一个固定大小的线程池,提交了一个Runnable任务和一个Callable任务。然后尝试获取Callable任务的结果,并在完成后关闭线程池。

使用java.util.concurrent包中的这些工具可以大大简化多线程编程的复杂性,使得开发者可以更专注于任务的实现,而不是线程的管理。

类图:

extends
creates
implements
extends
«interface»
Executor
+execute(Runnable command)
«interface»
ExecutorService
+submit(Callable task)
+invokeAny(Collection tasks)
+invokeAll(Collection tasks)
+shutdown()
+shutdownNow()
+isShutdown()
+isTerminated()
+awaitTermination(long timeout, TimeUnit unit)
Executors
+newFixedThreadPool(int nThreads)
+newCachedThreadPool()
+newSingleThreadExecutor()
+newScheduledThreadPool(int corePoolSize)
ThreadPoolExecutor
-corePoolSize
-maximumPoolSize
-keepAliveTime
-workQueue
+ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)
ScheduledThreadPoolExecutor
+ScheduledThreadPoolExecutor(int corePoolSize)

CompletionService顺序获取并发任务结果

package com.dereksmart.crawling.excutor;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @Author derek_smart
 * @Date 2024/7/30 8:56
 * @Description  CompletionService执行并发任务,并且按照任务完成的顺序处理它们的结果
 */
public class AdvancedExecutorExamples {
    public static void main(String[] args) {
        // 自定义线程池参数
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 60;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        // 使用有意义的线程名字的工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            private final AtomicInteger threadNumber = new AtomicInteger(1);

            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "AdvancedExecutorExample-Pool-" + threadNumber.getAndIncrement());
                if (t.isDaemon()) t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        };

        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );

        // 使用CompletionService来按照任务完成的顺序处理结果
        CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

        // 提交一系列Callable任务
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            int taskId = i;
            Future<String> future = completionService.submit(() -> {
                TimeUnit.SECONDS.sleep(taskId); // 模拟任务执行时间
                return "Task " + taskId + " completed.";
            });
            futures.add(future);
        }

        // 按照任务完成的顺序获取并处理结果
        for (int i = 0; i < futures.size(); i++) {
            try {
                // take()方法将阻塞,直到有任务完成
                Future<String> completedTask = completionService.take();
                String result = completedTask.get();
                System.out.println(result);
            } catch (InterruptedException e) {
                // 处理中断异常
                Thread.currentThread().interrupt();
                break;
            } catch (ExecutionException e) {
                // 处理任务执行异常
                Throwable cause = e.getCause();
                System.err.println("Task execution failed: " + cause.getMessage());
            }
        }

        // 关闭线程池
        shutdownAndAwaitTermination(executor);
    }

    // 优雅地关闭线程池的方法
    private static void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // 禁止提交新任务
        try {
            // 等待现有任务完成
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow();
                // 取消正在执行的任务
                // 等待任务响应中断
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            // 重新取消当前线程进行中断响应
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

掌握Java并发编程2:基本的任务执行框架-Executor、Executors和ExecutorService

使用场景: - 并发执行多个任务:当需要并发执行多个独立的任务,并且希望能够快速地处理那些先完成的任务的结果时,这段代码非常有用。

资源有效管理:使用线程池可以避免为每个任务创建新线程的开销,提高资源利用效率,并且可以通过调整线程池参数来优化性能。

任务执行顺序控制:通过CompletionService,可以控制任务的执行顺序,即使任务是并发执行的,也可以按照完成顺序来处理结果。

长时间运行的应用:在长时间运行的应用中,优雅关闭线程池是很重要的,以确保应用退出前,所有的任务都已经完成,并且所有的资源都已经被正确释放。

大量短暂任务的执行:如果有大量的短暂任务需要执行,并且希望能够快速启动和响应,使用ThreadPoolExecutor的缓存线程池或者自定义参数可以提高效率。

计算密集型或IO密集型任务:根据任务的类型,可以通过调整线程池的大小来优化性能,例如,对于计算密集型任务,可能想要限制线程数以匹配CPU核心数;对于IO密集型任务,由于线程经常会被阻塞,可以配置更多的线程来提高吞吐量。

总结

通过本文了解了ExecutorExecutorsExecutorService在Java并发编程中的核心作用和使用方法。探讨了如何利用这些工具来创建线程池、提交任务以及管理线程池的生命周期。正确地使用这些并发工具,不仅可以提高程序的性能,还能提升代码的质量和维护性。随着对这些基础概念的掌握,Java开发者将能够构建出更加健壮、可靠且可伸缩的并发应用程序,为处理现代应用程序中的高并发需求打下坚实的基础。记住,优秀的并发设计不仅仅是关于线程的操作,更是关于对资源的合理管理和对任务执行流程的精细控制。

转载自:https://juejin.cn/post/7397028381961945139
评论
请登录