likes
comments
collection
share

掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser

作者站长头像
站长
· 阅读数 23

引言

概述

在现代软件开发中,多线程和并发编程是提高应用性能和效率的关键技术之一。Java平台通过java.util.concurrent包提供了一套强大的并发工具类,它们可以帮助开发者有效地管理和协调线程之间的交互。本文将深入探讨这些工具类的原理、特点和使用场景,包括Future, Semaphore, CountDownLatch, CyclicBarrier, Exchanger, 和 Phaser

正文

Future

原理

Future 接口在 Java 中代表一个异步计算的结果。当提交一个任务给 ExecutorService 执行时,会得到一个实现了 Future 接口的对象。通过这个 Future 对象,可以检查任务是否已经完成、等待任务完成并获取计算结果,或者取消任务的执行。

使用场景

  • 异步执行长时间运行的计算任务,而不阻塞主线程。
  • 在多个任务并发执行时,获取每个任务的结果。
  • 取消尚未完成的任务。

Semaphore

原理

Semaphore 是一个计数信号量,用于管理一组资源的访问。信号量维护了一组许可(permits),线程可以通过调用 acquire() 方法来获取许可,如果没有可用许可,acquire() 会阻塞直到有许可被释放。线程完成资源的使用后,应该调用 release() 方法来释放许可。

使用场景

  • 控制资源的并发访问,例如限制同时访问文件的线程数量。
  • 实现生产者-消费者模式,其中许可代表可用的资源。

CountDownLatch

原理

CountDownLatch 是一个同步辅助类,它允许一个或多个线程等待一组事件发生。CountDownLatch 初始化时带有一个计数器,调用 countDown() 方法会使计数器减一,调用 await() 方法的线程会阻塞直到计数器达到零。

使用场景

  • 等待服务的一组操作初始化完成后,再继续执行。
  • 同时开始执行一组操作,如在并行测试中启动多个线程执行任务。

CyclicBarrier

原理

CyclicBarrier 类似于 CountDownLatch,但它可以重复使用。CyclicBarrier 维护一个计数器,当一组线程都到达某个屏障点(barrier point)时,计数器会归零,然后所有等待的线程会被释放,并且屏障被重置以便下一次使用。

使用场景

  • 同步一组线程的进度,确保它们在继续执行之前都到达了某个公共点。
  • 实现多阶段计算,每个阶段都需要等待所有线程完成。

Exchanger

原理

Exchanger 是一个同步点,在这个点上,两个线程可以交换数据。每个线程在到达同步点时提供一些数据给 Exchanger,并且接收另一个线程提供的数据。

使用场景

  • 在两个线程之间交换数据,例如在遗传算法或管道设计中。

Phaser

原理

Phaser 是一个可重用的同步辅助类,它将 CyclicBarrierCountDownLatch 的功能结合起来。它支持动态地注册线程,线程可以分阶段地到达,并且在每个阶段结束时同步。

使用场景

- 同步可变数量的线程执行分阶段的任务。 - 当任务被分为多个步骤执行,并且每个步骤必须等待之前步骤中的所有线程完成后才能开始。

Future

Future接口代表了一个异步计算的结果。它提供了一种检查计算是否完成、等待计算完成并检索其结果的机制。通过将耗时任务提交给ExecutorService,可以获得一个Future对象,并继续执行其他任务,直到需要结果。使用Future.get()可以获取结果,但这是一个阻塞调用;如果计算尚未完成,调用者将等待。为了避免阻塞,可以使用Future.isDone()来检查计算是否完成。

package com.dereksmart.crawling.future;

import java.util.concurrent.*;
/**
 * @Author derek_smart
 * @Date 2024/8/1 8:15
 * @Description  Future测试类
 */
public class FutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> future = executor.submit(() -> {
            TimeUnit.SECONDS.sleep(1); // 模拟长时间运行的任务
            return 42; // 返回异步计算的结果
        });

        // 在这里可以做一些其他的事情,而计算仍在异步进行
        System.out.println("异步计算的结果是1: " + future.isDone());
        // 等待异步计算的结果
        Integer result = future.get(); //这里会阻塞直到任务完成
        System.out.println("异步计算的结果是2: " + future.isDone());
        System.out.println("异步计算的结果是: " + result);

        executor.shutdown();
    }
}

掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser

Semaphore

Semaphore是一个计数信号量,常用于控制对资源的并发访问。它维护了一组许可证,线程在访问资源之前必须先获得许可证,使用完资源后释放许可证。如果请求许可证时没有可用的许可证,则线程阻塞直到有许可证被释放。这是一种实现生产者-消费者模式的有效方式。

package com.dereksmart.crawling.future;

import java.util.concurrent.*;
/**
 * @Author derek_smart
 * @Date 2024/8/1 8:26
 * @Description  Semaphore测试类
 */
public class SemaphoreExample {
    private static final int MAX_PERMITS = 3;
    private final Semaphore semaphore = new Semaphore(MAX_PERMITS);

    public void accessResource(int threadId) {
        try {
            semaphore.acquire(); // 获取许可
            System.out.println("线程 " + threadId + " 访问资源.");
            TimeUnit.SECONDS.sleep(2); // 模拟资源访问
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            semaphore.release(); // 释放许可
            System.out.println("线程 " + threadId + " 释放资源.");
        }
    }

    public static void main(String[] args) {
        SemaphoreExample example = new SemaphoreExample();
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 1; i <= 3; i++) {
            final int threadId = i;
            executor.execute(() -> example.accessResource(threadId));
        }

        executor.shutdown();
    }
}

掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser

CountDownLatch

CountDownLatch是一种同步辅助工具,它允许一个或多个线程等待一组事件发生。它通过一个计数器来实现,该计数器初始化时设置为需要等待的事件数。当一个事件发生时,计数器减一。调用CountDownLatch.await()的线程会阻塞,直到计数器变为零。

package com.dereksmart.crawling.future;

import java.util.concurrent.*;
/**
 * @Author derek_smart
 * @Date 2024/8/1 8:31
 * @Description  CountDownLatch测试类
 */
public class CountDownLatchExample {
    private static final int NUMBER_OF_THREADS = 5;

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
        ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);

        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            executor.submit(() -> {
                System.out.println("线程 " + Thread.currentThread().getId() + " 执行任务.");
                try {
                    Long s = (long) (Math.random()*1000);
                    Thread.sleep(s);
                    System.out.println("线程 " + Thread.currentThread().getId() + " 执行任务."+s+"s");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown(); // 通知 CountDownLatch 一个线程已经完成了任务
            });
        }

        latch.await(); // 等待直到所有线程完成任务
        System.out.println("所有线程已完成任务.");

        executor.shutdown();
    }
}

掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser

CyclicBarrier

CyclicBarrierCountDownLatch类似,但它可以重置并重复使用。CyclicBarrier被一组固定数量的线程使用,这些线程必须等待彼此到达一个共同的屏障点。当所有线程都到达屏障时,可以选择运行一个屏障动作。CyclicBarrier适用于分步骤执行的并行任务。

package com.dereksmart.crawling.future;

import java.util.concurrent.*;
/**
 * @Author derek_smart
 * @Date 2024/8/1 8:35
 * @Description CyclicBarrier测试类
 */
public class CyclicBarrierExample {
    private static final int NUMBER_OF_THREADS = 5;

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS, () -> {
            // 这个runnable会在所有线程都到达屏障后执行
            System.out.println("所有线程都到达了屏障点.");
        });
        ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);

        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            executor.submit(() -> {
                System.out.println("线程 " + Thread.currentThread().getId() + " 正在工作.");
                try {
                    TimeUnit.SECONDS.sleep(2); // 模拟工作
                    barrier.await(); // 等待其他线程
                    System.out.println("线程 " + Thread.currentThread().getId() + " 通过了屏障.");
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        executor.shutdown();
    }
}

掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser

Exchanger

Exchanger允许两个线程在一个同步点交换数据。每个线程在到达同步点时提供一些数据,并接收另一个线程提供的数据。这是一个双向的同步交换,常用于遗传算法、管道设计等需要线程间协作的场景。

package com.dereksmart.crawling.future;

import java.util.concurrent.*;
/**
 * @Author derek_smart
 * @Date 2024/8/1 8:45
 * @Description Exchanger测试类
 */
public class ExchangerExample {
    private static final Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        executor.execute(() -> {
            try {
                String message = "Hello from Thread A";
                String response = exchanger.exchange(message); // 交换数据
                System.out.println("Thread A received: " + response);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();    }
        });

        executor.execute(() -> {
            try {
                String message = "Hello from Thread B";
                String response = exchanger.exchange(message); // 交换数据
                System.out.println("Thread B received: " + response);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });


        executor.execute(() -> {
            try {
                String message = "Hello from Thread C";
                String response = exchanger.exchange(message); // 交换数据
                System.out.println("Thread C received: " + response);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        executor.execute(() -> {
            try {
                String message = "Hello from Thread D";
                String response = exchanger.exchange(message); // 交换数据
                System.out.println("Thread D received: " + response);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        executor.shutdown();
    }
}

掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser

Phaser

Phaser是一个可重用的同步辅助工具,它结合了CyclicBarrierCountDownLatch的功能。它可以处理动态数量的线程参与多阶段任务的同步。每个阶段结束时,Phaser等待预定数量的线程到达,然后前进到下一个阶段。

package com.dereksmart.crawling.future;

import java.util.concurrent.*;
/**
 * @Author derek_smart
 * @Date 2024/8/1 8:51
 * @Description Phaser测试类
 */
public class PhaserExample {
    private static final int NUMBER_OF_PARTIES = 3;
    private static final int PHASES_TO_COMPLETE = 3;

    public static void main(String[] args) {
        final Phaser phaser = new Phaser(NUMBER_OF_PARTIES) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("阶段 " + phase + " 完成.");
                return phase >= PHASES_TO_COMPLETE - 1 || registeredParties == 0;
            }
        };

        ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_PARTIES);

        for (int i = 0; i < NUMBER_OF_PARTIES; i++) {
            final int threadId = i;
            executor.execute(() -> {
                for (int phase = 0; phase < PHASES_TO_COMPLETE; phase++) {
                    System.out.println("线程 " + threadId + " 正在执行阶段 " + phase);
                    phaser.arriveAndAwaitAdvance(); // 等待其他线程完成当前阶段
                }
            });
        }

        executor.shutdown();
    }
}

掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser

这些并发工具类为开发者提供了强大的机制来管理复杂的多线程场景。正确使用这些工具可以大大提高应用程序的并发性能和响应能力,同时保证数据的一致性和线程的安全。然而,多线程编程仍然充满挑战,开发者需要仔细设计他们的程序以避免死锁、竞态条件和其他并发相关问题。

实战演练

考虑一个真实的使用场景,比如一个并行数据处理系统,它需要执行以下步骤:

  1. 从不同的数据源并行加载数据。
  2. 当所有数据加载完毕后,执行一些数据预处理。
  3. 分批处理数据,并在每批处理完成后进行一些清理工作。
  4. 每批数据处理完毕后,交换处理结果以便进行下一步的聚合处理。
  5. 所有数据处理完成后,执行最终的数据聚合和清理工作。

在这个场景中,我们可以使用CountDownLatch来同步数据加载,CyclicBarrier来同步批处理,Exchanger来交换数据,Phaser来管理多阶段任务,以及Semaphore来限制同时进行的处理任务数量。以下是对应的代码实现:

代码实现

package com.dereksmart.crawling.future;

import java.util.concurrent.*;

/**
 * @Author derek_smart
 * @Date 2024/8/1 8:35
 * @Description 并行数据处理系统
 */
public class DataProcessingSystem {
    private static final int NUMBER_OF_SOURCES = 3;
    private static final int NUMBER_OF_BATCHES = 5;
    private final CountDownLatch dataLoadedLatch = new CountDownLatch(NUMBER_OF_SOURCES);
    private final CyclicBarrier batchProcessingBarrier = new CyclicBarrier(NUMBER_OF_SOURCES);
    private final Exchanger<String>[] exchangers = new Exchanger[NUMBER_OF_SOURCES / 2]; // For an even number of sources
    private final Semaphore processingSemaphore = new Semaphore(2); // Limit concurrent processing

    public DataProcessingSystem() {
        for (int i = 0; i < exchangers.length; i++) {
            exchangers[i] = new Exchanger<>();
        }
    }

    public void loadData() throws InterruptedException ,Exception{
        ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_SOURCES);
        for (int i = 0; i < NUMBER_OF_SOURCES; i++) {
            final int sourceId = i;
            executorService.submit(() -> {
                System.out.println("Loading data from source " + sourceId);
                // Simulate data loading
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                dataLoadedLatch.countDown();
            });
        }
        dataLoadedLatch.await(); // Wait for all data to be loaded
        System.out.println("Data loaded from all sources. Starting processing...");
        executorService.shutdown(); // Shutdown executor after all tasks are submitted
    }

    public void processData() throws InterruptedException, BrokenBarrierException ,Exception{
        ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_SOURCES);
        Phaser finalAggregationPhaser = new Phaser(1); // Register self

        for (int i = 0; i < NUMBER_OF_SOURCES; i++) {
            final int sourceId = i;
            final String initialData = "Data from source " + sourceId; // Loaded data
            finalAggregationPhaser.register(); // Register thread with the phaser

            executorService.submit(() -> {
                String data = initialData;
                try {
                    for (int batch = 0; batch < NUMBER_OF_BATCHES; batch++) {
                        processingSemaphore.acquire(); // Acquire permit to limit concurrent processing
                        System.out.println("Processing batch " + batch + " from source " + sourceId);
                        // Simulate batch processing
                        TimeUnit.SECONDS.sleep(1);
                        processingSemaphore.release(); // Release permit

                        // Exchange data with another source if possible
                        if (sourceId < exchangers.length) {
                            int pairId = (sourceId + 1) % 2;
                            data = exchangers[pairId].exchange(data);
                            System.out.println("Source " + sourceId + " exchanged data with source " + pairId);
                        }

                        // Wait for all sources to complete the batch
                        batchProcessingBarrier.await();
                    }
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    finalAggregationPhaser.arriveAndDeregister(); // Deregister when done
                }
            });
        }

        executorService.shutdown(); // Shutdown executor after all tasks are submitted
        executorService.awaitTermination(1, TimeUnit.HOURS); // Wait for all tasks to complete

        // Wait for final aggregation to complete and perform any additional finalization
        finalAggregationPhaser.arriveAndAwaitAdvance();
        System.out.println("Final aggregation and cleanup complete. System shutdown.");
    }

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException,Exception {
        DataProcessingSystem system = new DataProcessingSystem();
        system.loadData();
        system.processData();
    }
}

掌握Java并发编程3:同步工具类-Future、Semaphore、CountDownLatch、CyclicBarrier、Exchanger和Phaser

在这个例子中:

  • CountDownLatch用于等待所有数据源加载完毕。
  • CyclicBarrier用于同步每批数据处理的结束,以便进行清理工作。
  • Exchanger用于在数据源之间交换处理结果。
  • Phaser用于在所有数据处理完毕后进行最终的聚合和清理工作。
  • Semaphore用于限制同时执行的数据处理任务数量,以防止系统过载。

时序图:

MainExecutor ServiceCountDownLatchCyclicBarrierSemaphoreExchangerPhaserloop[For each datasource]loop[For each data source]loop[For each data source]loadData()countDown()await() until count reaches 0Data loaded from all sourcesprocessData()acquire()await() for batch processingrelease()exchange(data) (if applicable)await() for next batcharriveAndAwaitAdvance()await() until all parties arriveFinal aggregation and cleanup completeMainExecutor ServiceCountDownLatchCyclicBarrierSemaphoreExchangerPhaser

这个例子展示了如何在复杂的数据处理系统中使用java.util.concurrent包下的同步工具类来协调多线程任务的执行。这些工具类的组合使用可以有效地管理线程间的同步和协作,从而提高系统的并发性能和稳定性。

总结

并发编程中,java.util.concurrent包提供了一系列强大的同步工具类,使得开发者能够更有效地管理和协调多线程间的交互。以下是这些工具类的简要概述:

Future: 表示异步计算的结果。它允许你启动一个可能耗时的计算,继续执行其他任务,然后在将来的某个时间点取得计算结果。

Semaphore: 计数信号量,用于控制对共享资源的并发访问。它维护一组许可证,线程在访问资源之前必须获取许可证,使用完资源后释放许可证。

CountDownLatch: 一个同步辅助类,允许一个或多个线程等待一组事件的完成。它通过一个计数器实现,该计数器在事件完成时递减,直到计数器为零时释放所有等待的线程。

CyclicBarrier: 类似于CountDownLatch,但它可以重置并重复使用。它用于同步一组线程,使它们在继续执行之前等待彼此到达一个共同的屏障点。

Exchanger: 允许两个线程在一个同步点交换数据。这是一个双向同步操作,通常用于需要线程间协作的场景。

Phaser: 一个可重用的同步辅助类,结合了CyclicBarrierCountDownLatch的功能。它可以处理动态数量的线程参与多阶段任务的同步。

这些工具类具有不同的用途和应用场景,它们可以帮助开发者解决复杂的并发问题,如数据同步、线程间通信、流程控制等。正确使用这些工具可以提高程序的并发性能和响应能力,同时保证数据的一致性和线程的安全。

转载自:https://juejin.cn/post/7398050410806280218
评论
请登录