likes
comments
collection
share

线程间通信方式(2)

作者站长头像
站长
· 阅读数 86

前文了解了线程通信方式中的Object.wait/Object.notify以及Semaphore,接下来我们继续了解其他的线程间通信方式。

CountDownLatch

CountDownLatch利用一个指定的计数初始化,由于调用了countDown方法,await方法会阻塞直到当前技术为0,之后所有等待的线程都会被释放,任何后续的await都会立即返回。

CountDownLatch是一个通用的同步工具,可以用于许多目的。如果CountDownLatch初始化的计数为1,则可以用作简单的开/关锁存器或门,也就意味着所有调用await的线程都在门处等待,直到由调用countDown()的线程打开它。一个初始化为N的CountDownLatch可以用来让一个线程等待,直到N个线程完成了某个动作,或者某个动作完成了N次。

CountDownLatch核心接口说明如下:

方法名称描述备注
await()使当前线程进入等待状态,直到计数减为0或者当前线程被中断,否则不会唤醒/
await(long timeout, TimeUnit unit)等待timeout时间后,计数还没减成0,则等待线程唤醒继续执行,不再等待/
countDown()对计数减1,如果减到了0,就会唤醒等待的线程/
getCount()获取当前计数的值/

举个简单例子,某主任务中一共要发起5个子任务,等待所有任务完成后主任务继续,此时在主任务执行线程以计数取值为5初始化CountDownLatch,调用await等待,在每个子任务完成时,调用countDown方法使计数减1,等到5个子任务全部完成后,此时计数减为0,主任务唤醒,继续执行。

通过例子和描述可以看出CountDownLatch属于一次性操作,计数无法重置。

以主任务中发起5个子任务为例,使用CountDownLatch的代码实现如下:

 public static void main(String[] args) {
     Thread mainThread = new Thread(new Runnable() {
         @Override
         public void run() {
             System.out.println("MainThread start working");
             CountDownLatch countDownLatch = new CountDownLatch(5);
             try {
                 for (int i=0;i<5;i++) {
                     Thread subThread = new Thread(new Runnable() {
                         @Override
                         public void run() {
                             System.out.println(Thread.currentThread().getName()+" start working");
                             System.out.println(Thread.currentThread().getName()+" completed");
                             countDownLatch.countDown();
                         }
                     });
                     subThread.setName("SubThread"+(i+1));
                     subThread.start();
                 }
                 countDownLatch.await();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println("MainThread completed");
         }
     });
     mainThread.setName("MainThread");
     mainThread.start();
 }

运行结果如下:

线程间通信方式(2)

对于CountDownLatch 而言,如果在计数为0前,有子任务发生异常导致退出,则所有等待的线程都会一致等待,直到超时时间来临,所以使用CountDownLatch一定要注意线程异常的处理。

CyclicBarrier

CyclicBarrier同样是一个多线程同步的工具类,译为循环栅栏,其允许一组线程互相等待到达一个同步的屏障点,CyclicBarrier在一组固定大小的线程中存在相互等待的场景下十分有用,之所以称为循环栅栏,是因为其在其他线程释放后可以重新使用。

CyclicBarrier使用一个指定计数和Runnable进行初始化,初始化完成后,当CyclicBarrier.await调用次数等于计数,也就是等待线程数等于计数时,则会触发初始化传入Runnable运行,该Runnable运行在最后一个进入等待的线程中,随后所有等待线程唤醒继续执行。

7位选手参加田径比赛,所有人在起点就位,准备完成后,发射信号枪起跑。用CyclicBarrier来实现比赛逻辑,则以计数7和运行发射信号枪的Runnable初始化CyclicBarrier,每一个选手起点准备完成后,都调用一次CyclicBarrier.await,当所有选手都准备完成,发射信号枪的Runnable运行,比赛开始,实现代码如下:

 public static void main(String[] args) {
     CyclicBarrier cyclicBarrier = new CyclicBarrier(7, new Runnable() {
         @Override
         public void run() {
             System.out.println("所有选手准备完成,发射信号枪,比赛开始,运行在:"+Thread.currentThread().getName());
         }
     });
     for (int i=0;i<7;i++) {
         int finalI = i;
         Thread thread = new Thread(new Runnable() {
             @Override
             public void run() {
                 System.out.println((finalI +1)+"号选手准备完成,运行在:"+Thread.currentThread().getName());
                 try {
                     cyclicBarrier.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 } catch (BrokenBarrierException e) {
                     e.printStackTrace();
                 }
                 System.out.println((finalI +1)+"号选手准备达到准点,运行在:"+Thread.currentThread().getName());
             }
         });
         thread.setName("选手"+(i+1));
         thread.start();
     }
 }

输出如下:

线程间通信方式(2)

对于CyclicBarrier 而言,如果在所有线程都到达屏障陷入阻塞前,如果有线程发生异常导致未到达栅栏提前退出,则所有等待在栅栏都会以BrokenBarrierException或InterruptedException异常退出。

Lock,Condition与ReentrantLock

Lock通常译为锁,其是一种控制多个线程对共享资源访问的工具,与同步方法和语句相比,以接口定义,在其内部提供了更广泛的锁操作,其允许的关联结构更灵活,比如不同的属性,多个关联的Condition对象等。通常情况下,可以通过锁控制一次只有一个线程可以访问共享资源,所有线程在访问共享资源时都需要获取锁。部分锁也支持多共享资源进行并发访问,比如ReadWriteLock。

Lock接口核心函数如下表所示:

函数名称描述备注
lock()获取对象锁,如果锁不可用,则当前线程被阻塞,在获取锁前处于休眠状态/
unlock()释放锁,锁释放操作应处于finally块内,确保在任何情况下都能释放,以免造成死锁/
tryLock()锁是否可用,true-可用,false-不可用/

Condition通常译为条件,其将Object的wait,notify,notifyAll提取到不同的对象中,通过将这些对象与任一锁对象的使用结合起来,从而达到单个对象具有多个等待集的效果,在Lock+Condition的模式中,Lock代替了前文中Object.wait+synchronized中的synchronized,Condition代替了Object.wait/notify/notifyAll。

Condition核心函数如下表所示:

函数名称描述备注
await()当前线程进入等待状态,直到被通知或中断,线程从await方法返回进入运行状态的情况包括:1.其他线程调用了该Condition的signal或signalAll方法。2.其他线程中断当前线程。/
signal()唤醒一个等待在该Condition上的线程/
signalAll()唤醒所有等待在该Condition上的线程/

在实现上Lock和Condition均为接口,所以我们一般使用Lock的实现类ReentrantLock来实现锁机制,借助ReentrantLock对象内部的newCondition获得Condition的实现对象,进而搭配完成线程间通信,接下来我们使用ReentrantLock实现消费者-生产者模式,代码如下:

 private static int count = 0;
 public static void main(String[] args) {
     // 创建Lock对象
     ReentrantLock lock = new ReentrantLock();
     // 创建Condition对象
     Condition condition = lock.newCondition();
     // 创建盘子
     Counter counter = new Counter();
     Thread incThread = new Thread(new Runnable() {
         @Override
         public void run() {
             while (count < 4) {
                 lock.lock();
                 try {
                     if (counter.getCount() != 0) {
                         condition.await();
                     }
                     counter.incCount();
                     count++;
                     System.out.println("Inc Thread ++,current count is:" + counter.getCount());
                     condition.signalAll();
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 } finally {
                     lock.unlock();
                 }
             }
         }
     });
     incThread.setName("Inc Thread");
     incThread.start();
 ​
     Thread decThread = new Thread(new Runnable() {
         @Override
         public void run() {
             while (count < 4) {
                 lock.lock();
                 try {
                     if (counter.getCount() == 0) {
                         condition.await();
                     }
                     counter.decCount();
                     System.out.println("Dec Thread --,current count is:" + counter.getCount());
                     condition.signalAll();
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 } finally {
                     lock.unlock();
                 }
             }
         }
     });
     decThread.setName("Dec Thread");
     decThread.start();
 }

运行结果如下:

线程间通信方式(2)

前面提到过,Condition将Object的wait,notify,notifyAll提取到不同的对象中,也就意味着我们可以更灵活的控制锁获取,我们新建两个Condition对象,一个控制生产者,一个控制消费者,代码如下:

 private static int count = 0;
 public static void main(String[] args) {
     ReentrantLock lock = new ReentrantLock();
     // 控制生产者
     Condition incCondition = lock.newCondition();
     // 控制消费者
     Condition decCondition = lock.newCondition();
     Counter counter = new Counter();
     Thread incThread = new Thread(new Runnable() {
         @Override
         public void run() {
             while (count < 4) {
                 lock.lock();
                 try {
                     if (counter.getCount() != 0) {
                         // 阻塞当前生产线程
                         incCondition.await();
                     }
                     counter.incCount();
                     count++;
                     System.out.println("Inc Thread ++,current count is:" + counter.getCount());
                     // 唤醒消费者
                     decCondition.signalAll();
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 } finally {
                     lock.unlock();
                 }
             }
         }
     });
     incThread.setName("Inc Thread");
     incThread.start();
 ​
     Thread decThread = new Thread(new Runnable() {
         @Override
         public void run() {
             while (count < 4) {
                 lock.lock();
                 try {
                     if (counter.getCount() == 0) {
                         // 阻塞当前消费线程
                         decCondition.await();
                     }
                     counter.decCount();
                     System.out.println("Dec Thread --,current count is:" + counter.getCount());
                     // 唤醒生产线程
                     incCondition.signalAll();
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 } finally {
                     lock.unlock();
                 }
             }
         }
     });
     decThread.setName("Dec Thread");
     decThread.start();
 }

运行结果如下:

线程间通信方式(2)

当然ReentrantLock也可以不搭配Condition独立使用的,通过lock函数获取锁,通过unlock解锁,代码如下所示:

 public static void main(String[] args) {
     ReentrantLock reentrantLock = new ReentrantLock();
     ExecutorService testSynchronizedBlock = Executors.newCachedThreadPool();
     testSynchronizedBlock.execute(new Runnable() {
         @Override
         public void run() {
             System.out.println(Thread.currentThread().getName()+" enter lock first");
             reentrantLock.lock();
             System.out.println(Thread.currentThread().getName()+" enter lock");
             try {
                 Thread.sleep(2000);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
             System.out.println(Thread.currentThread().getName()+" enter lock again");
             reentrantLock.lock();
             try {
                 Thread.sleep(2000);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
             System.out.println(Thread.currentThread().getName()+" exit lock again");
             reentrantLock.unlock();
             System.out.println(Thread.currentThread().getName()+" exit lock");
             reentrantLock.unlock();
         }
     });
     testSynchronizedBlock.execute(new Runnable() {
         @Override
         public void run() {
             System.out.println(Thread.currentThread().getName()+" enter lock before");
             reentrantLock.lock();
             System.out.println(Thread.currentThread().getName()+" enter lock");
             try {
                 Thread.sleep(3000);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
             System.out.println(Thread.currentThread().getName()+" exit lock");
             reentrantLock.unlock();
         }
     });
 }

上述代码,在第一个代码中演示了ReentrantLock的可重入性,在使用并发锁相关工具类时,一定要注意获取锁和释放锁必须一一配对,在任何情况下都要确保能释放锁,以免造成死锁

转载自:https://juejin.cn/post/7171061067634704391
评论
请登录