likes
comments
collection
share

并发编程-常见并发工具BlockingQueue的使用及原理解析

作者站长头像
站长
· 阅读数 16

线程的基础和使用 Synchronized原理分析 并发编程-探索可见性背后的本质以及vloatile原理 并发编程-死锁/ThreadLocal 并发编程-ReentrantLook底层设计 并发编程-Condition底层设计

Java中的阻塞队列是一种特殊类型的队列,它支持在队列为空或队列已满时自动阻塞等待。它是并发编程中常用的线程安全数据结构之一,用于在多线程环境下安全地传递数据。

Java提供了java.util.concurrent包中的BlockingQueue接口和几个实现类来实现阻塞队列,其中最常用的实现类是:

  • ArrayBlockingQueue:一个基于数组的有界阻塞队列。
  • LinkedBlockingQueue:一个基于链表的可选有界阻塞队列。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个支持延迟获取元素的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列,用于线程间的直接传输。

阻塞队列的主要方法包括:

  • put(E element):将元素添加到队列的末尾,如果队列已满则阻塞等待。
  • take():移除并返回队列头部的元素,如果队列为空则阻塞等待。
  • offer(E element):将元素添加到队列的末尾,如果队列已满则返回false。
  • poll():移除并返回队列头部的元素,如果队列为空则返回null。

BlockingQueue的使用

package org.example;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueDemo {
    public static void main(String[] args) {
        // 创建一个容量为3的ArrayBlockingQueue
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        // 创建生产者线程
        Thread producerThread = new Thread(() -> {
            try {
                // 生产者往队列中添加元素
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(1);
                    queue.put("A"+i);
                    System.out.println("Producer added: A"+i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 创建消费者线程
        Thread consumerThread = new Thread(() -> {
            try {
                // 消费者从队列中取出元素
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(1);
                    String item = queue.take();
                    System.out.println("Consumer removed: " + item);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 启动生产者和消费者线程
        producerThread.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        consumerThread.start();
    }
}

如上代码,定义的阻塞队列大小是3,我们先启动生产者线程睡几秒后再启动消费者线程我看看到打印结果是生成到A3就停了等到消费者有产品被消费了此时又唤醒了生产者线程继续生产 并发编程-常见并发工具BlockingQueue的使用及原理解析

BlockingQueue源码分析

初始化源码

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

发现用了ReentrantLockCondition,具体这两个类是做什么用的继续往下看

put源码分析

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

这里也很简单其实就是利用ReentrantLock加锁往阻塞队列中添加元素,如果此时满了就调用notFull.await()进行等待,然后将当前线程添加到notFullCondition队列中,没满则添加元素到阻塞队列enqueue

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

这里是在添加到队列后需要调用notEmpty.signal();说明此时队列有东西,消费者Condition如果有线程此时会被唤醒,然后重新获得锁消费元素

take源码分析

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

发现和 put源码很像也是加锁移除元素,如果当前阻塞队列是0则调用notEmpty.await();,将当前线程添加到 notEmptyCondition队列中等待被唤起

BlockingQueue原理流程图

并发编程-常见并发工具BlockingQueue的使用及原理解析

转载自:https://juejin.cn/post/7235714313720397884
评论
请登录