likes
comments
collection
share

CyclicBarrier的使用

作者站长头像
站长
· 阅读数 20

允许任意数量的线程 await 等待,直到所有合作线程都达到某个点,然后所有线程从该点继续执行

构造函数

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
  • parties 并行等待的线程个数
  • barrierAction 所有线程都执行等待后,需要运行的代码

方法


// 当前线程一直等待到 parties 个线程都调用了 await 方法
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
// 当前线程等待到 parties 个线程都调用了 await 方法 或者超过 timeout 时间
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

// 线程等待是否被破坏了,比如超时,或者线程 interrupted
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}
// 重置,CyclicBarrier 可以重复利用,不用重新生成对象
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}
// 获取还剩多少个线程没有执行到等待 await
public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

使用示例

public class Main {
    public static final int PARTIES = 3;
    public static final int LEN = 5;
    private Integer[][] mData;
    CyclicBarrier mCyclicBarrier;

    public static void main(String[] args) {
        Main main = new Main();
        main.done();
    }

    private void done() {
        Random random = new Random();
        // 创建 CyclicBarrier 3个线程等待,执行等待后执行的 runnable
        mCyclicBarrier = new CyclicBarrier(PARTIES, () -> {
            // 分别打印 3 个 int 类型数组
            for (int i = 0; i < PARTIES; i++) {
                for (int j = 0; j < LEN; j++) {
                    System.out.print(mData[i][j] + " ");
                }
                System.out.println();
            }
        });
        // 二维数组,3个线程处理一个 5个 数字大小的排序
        mData = new Integer[PARTIES][LEN];

        for (int i = 0; i < PARTIES; i++) {
            mData[i] = new Integer[LEN];
            for (int j = 0; j < LEN; j++) {
                // 构建数据
                mData[i][j] = 100 * i + random.nextInt(50);
            }
            // 启动 3个 线程
            new Thread(new Worker(mData[i], mCyclicBarrier)).start();
        }
    }

    private static class Worker implements Runnable {


        Integer[] mData;
        CyclicBarrier mCyclicBarrier;

        public Worker(Integer[] data, CyclicBarrier cyclicBarrier) {
            mData = data;
            mCyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            Arrays.sort(mData); // 进行排序
            System.out.println(Thread.currentThread().getName() + " : finish...");
            try {
                mCyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + " await over ...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

输出

Thread-0 : finish...
Thread-2 : finish...
Thread-1 : finish...
0 20 27 41 43 
102 124 126 136 148 
210 233 234 240 246 
Thread-1 await over ...
Thread-0 await over ...
Thread-2 await over ...

说明

  • 同步辅助工具,它允许一组线程全部互相等待以到达一个公共的障碍点。 CyclicBarriers在涉及固定大小的线程方的程序中很有用,该线程方有时必须互相等待。 该屏障称为循环屏障,因为它可以在释放等待线程之后重新使用。
  • CyclicBarrier支持可选的Runnable命令,该命令在障碍中的最后一个线程到达之后但在释放任何线程之前,每个障碍点运行一次。 此屏障操作对于在任何一方继续之前更新共享状态很有用
  • CyclicBarrier 对失败的同步尝试使用全无损坏模型:如果线程由于中断,失败或超时而过早离开障碍点,则所有其他在该障碍点等待的线程也会通过BrokenBarrierException(或InterruptedException)异常离开 他们也大约同时被打断了。
  • 内存一致性影响:调用await()之前的线程中的操作发生在 CyclicBarrier 操作的一部分之前,而其他操作又发生在其他线程中的相应await()成功返回之后的操作之前。