likes
comments
collection

Java阻塞队列详解

作者站长头像
站长
· 阅读数 24

1.什么是阻塞队列

阻塞队列--BlockingQueue,它是一个接口,

public interface BlockingQueue<E> extends Queue<E>

BlcokingQueue继承了Queue接口,是队列的一种,Queue和BlockingQueue都是在Java5中加入的,BlockingQueue是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。 主要并发队列关系图

Java阻塞队列详解 上图展示了Queue最主要的实现类,可以看出Java提供的线程安全的队列(也成为并发队列)主要分为阻塞队列和非阻塞队列俩大类。

  • 阻塞队列的典型例子就是BlockingQueue接口的实现类,主要有6种实现:ArrayBlcokingQueue,LinkedBlockingQueue,SynchronousQueue,DelayQueue,PriorityBlockingQueue和LinkedTransherQueue,它们各自有不同不同的特点。
  • 非阻塞并发队列最典型的就是ConcurrentLinkedQueue,这个类不会让线程阻塞,利用CAS保证了线程安全。
  • 从上图还可以看到Deque,就是双端队列,双端队列从头和尾都能添加和删除元素,而普通的Queue只能从一端进入,另一端出去。这是Queue和Deque最主要的区别,其它方面基本都差不多。

2.阻塞队列的特点

阻塞队列的特点就是阻塞两个字,哈哈哈哈。像是废话。阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度降下来。最重要的两个方法:take()和put(); take() take方法的功能是获取并移除队列的头节点,通常在队列里有数据的时候是可以正常移除的,当队列里无数据,则阻塞,直到队列里面有数据。一旦有数据了,就立刻解除阻塞状态,并且获取到数据 put() put方法插入元素时,如果队列没有满可以正常插入,如果队列满了,则阻塞,直到队列里面有了空闲空间,就会消除阻塞状态,并把数据添加进去。

阻塞队列容量问题

阻塞队列分为两种有界和无界。无界队列意味着里面可以容纳非常多的元素,例如LinkedBlcokingQueue的上限是Integer.MAX_VALUE。学过Java的人都知道这个数其实挺大的。但有界队列例如ArrayBlockingQueue如果队列满了,也不会扩容。

3.阻塞队列常用方法

在阻塞队列中有很多方法,而且都非常相似,这里我把常用的8个方法总结了一下以添加、删除为主。主要分为三类:

  • 抛出异常:add、remove、element
  • 返回结果但是不抛出异常:offer、poll、peek、
  • 阻塞:take、put、

3.1 抛出异常:add、remove、element

add方法是往队列里面添加一个元素,如果队列满了,就会抛出异常来提示我们队列已满。

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
    blockingQueue.add(1);
    blockingQueue.add(1);
    blockingQueue.add(1);
}

这里我们指定队列容量为2,并且尝试加入3个值。超过了容量上限就会报IllegalStateException的错误。

Java阻塞队列详解 remove方法是删除元素,如果我们队列为空的时候又进行了删除操作,同样会报NoSuchElementException。

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
    blockingQueue.add(1);
    blockingQueue.add(1);
    blockingQueue.remove();
    blockingQueue.remove();
    blockingQueue.remove();
}

这里我们指定容量为2,并且添加两个元素,然后删除三个元素。结果如下

Java阻塞队列详解 element方法是返回队列的头节点,但是不会删除这个元素。当队列为空时同样会报NoSuchElementException的错误

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
    blockingQueue.element();
}

Java阻塞队列详解

3.2返回结果但是不抛出异常offer、poll、peek

offer方法用来插入一个元素,如果插入成功会返回true,如果队列满了,再插入元素不会抛出异常但是会返回false。

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
    System.out.println(blockingQueue.offer(1));
    System.out.println(blockingQueue.offer(1));
    System.out.println(blockingQueue.offer(1));
}

Java阻塞队列详解 poll方法和remove方法是对应的都是删除元素,都会返回删除的元素,但是当队列为空时则会返回null

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
    blockingQueue.offer(1);
    blockingQueue.offer(1);
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());

}

Java阻塞队列详解 peek方法和element方法对应,返回队列的头节点但并不删除,如果队列为空则直接返回null

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
    System.out.println(blockingQueue.peek());

}

Java阻塞队列详解 带超时时间的offer和poll

offer(E e, long timeout, TimeUnit unit)

它有三个参数,分别是元素、超时时长和时间单位。通常情况下,这个方法会插入成功并且返回true;如果队列满了导致插入不成功,在调用带超时时间重载方法的offer的时候,则会等待指定的超时时间,如果到了时间依然没有插入成功,则返回false。

E poll(long timeout, TimeUnit unit)

这个带参数的poll和上面的offer类似。如果能够移除,便会立即返回这个节点的内容;如果超过了我们定义的超时时间依然没有元素可以移除,便会返回null作为提示。

3.2 阻塞put和take

put方法的作用是插入元素,通常在队列没有满的时候是正常插入。如果队列满了无法继续插入,这时它不会立刻返回false和抛出异常,而是让插入的线程进入阻塞状态,直到队列里面有空闲空间了。此时队列就会让之前的线程接触阻塞状态,并把刚才那个元素添加进去。 take方法的作用是获取并移除队列的头节点。通常队列里面有元素会正常取出数据并移除;但是如果执行take的时候队列里无数据,则阻塞,直到队列里面有数据以后,就会立即解除阻塞状态,并且取到数据。 搞了个总结可以看一下

Java阻塞队列详解

4.常见的阻塞队列

4.1 ArrayBlockingQueue

ArrayBlockingQueue是最典型的有界队列,其内部是用数组存储元素的,利用Reentrant实现线程安全

public ArrayBlockingQueue(int capacity) {
   this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
   if (capacity <= 0)
       throw new IllegalArgumentException();
   this.items = new Object[capacity];
   lock = new ReentrantLock(fair);
   notEmpty = lock.newCondition();
   notFull =  lock.newCondition();
}

我们在创建它的时候就需要指定它的容量,之后也不可以在扩容了,在构造函数中我们同样可以指定是否是公平的。如果ArrayBlockingQueue被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会优先被处理,其它线程不允许插队。

4.2 LinkedBlockingQueue

LinkedBlockingQueue内部使用链表实现的,如果我们不指定它的初始容量,那么它的默认容量就为整形的最大值Integer.MAX_VALUE,由于这个数特别特别的大,所以它也被称为无界队列。

4.3 SynchronousQueue

SynchronousQueue最大的不同之处在于,它的容量不同,所以没有地方来暂存元素,导致每次取数据都要先阻塞,直到有数据放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。SynchronousQueue的容量不是1而是0,因为SynchronousQueue不需要去持有元素,它做的就是直接传递。

4.4 PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列,可以通过自定义类实现compareTo()方法来制定元素排序规则,或者初始化时通过构造器参数Comparator来制定排序规则。同时,插入队列的对象必须是可比较大小的,也就是Comparable的,否则就会抛出ClasscastException异常。 它的take方法在队列为空时会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的put()方法永远不会阻塞,添加操作始终都会成功。

4.5 DelayQueue

Delay这个队列比较特殊,具有延迟的功能,我们可以设定在队列中的任务延迟多久之后执行。它是无界队列,但是放入的元素必须实现Delayed接口,而Delayed接口又继承了Comparable接口,所以自然就拥有了比较和排序的能力.

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

可以看出这个Delayed接口继承自Comparable,里面需要涉嫌getDelay.这里的getDelay方法返回的是还剩下多长的延迟时间才会被执行。如果返回0或者负数则代表任务已过期。元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。

5 阻塞和非阻塞队列的并发安全原理是什么

5.1阻塞队列-以ArrayBlockingQueue为例

先来看一下ArrayBlockingQueue的几个重要属性

//用于存放元素的数组
final Object[] items;
//下一次读取的位置
int takeIndex;
// 下一次写入的位置
int putIndex;
// 队列中元素的数量
int count;
/*
 * 用于控制并发的工具类
 */
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

ArrayBlockingQueue实现并发同步的原理就是利用ReentrantLock和它的俩个Condition,读操作和写操作都需要先获取到ReentrantLock独占锁才能进行下一步操作。进行读操作时如果队列为空,线程就会进入到读线程专属的notEmpty的Condition的队列中去排队,等待写线程写入新的元素;同理队列已满,这个时候写操作的线程进入到写线程专属的notFull队列中去排队,等待读线程将队列元素移除并腾出空间。 看一下ArrayBlockingQueue的put方法。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

在put方法中,首先用checkNotNull方法检查插入的元素是不是null,如果不是null,我们会用Reentrantlock上锁,并且使用的上锁方法是lock.lockInterruptibly()。这个方法在获取锁的同时是可以相应中断的,这也正是我们的阻塞队列调用put方法时,在尝试获取锁但还没拿到锁的期间可以响应中断的底层原因。紧接着在while循环中,它会检查当前队列是不是已经满了,也就是count的长度是否等于数组长度。如果等于代表已经满了,于是我们便会进行等待,直到有空余的时候,我们才会执行下一步操作,调用enqueue方法让元素进入队列,最后用unlock方法解锁。

5.2 非阻塞队列ConcurrentLinkedQueue

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

通过源码我们可以清晰的看出,非阻塞队列ConcurrentLinkedQueue它的主要流程就是在判空以后,会进入一个大循环中,p.casNext()方法,这个方法正是利用了CAS来操作的,这个死循环去配合CAS其实就是我们平时说的乐观锁的思想。其实非阻塞队列ConcurrentLickedQueue使用CAS非阻塞算法+不停重试的实际来实现线程安全的

6 线程池对于阻塞队列的选择

Java阻塞队列详解

  • FixedThreadPool选取的是LinkedBlcokingQueue(同理SingleThreadExecutor) 首先我们知道LinkedBlockingQueu默认是无限长的,而FixedThreadPool的线程数是固定的,当核心线程数都在被使用时,这个时候如果进来新的任务会被放进阻塞队列中。由于队列是没有容量上限的,队列永远不会被填满,这样就保证了线程池FixedThreadPool和SingleThreadExecutor,不会拒绝新任务的提交,也不会丢失数据。
  • CachedThreadPool选取的是SynchronousQueue 首先CachedThreadPool的线程最大数量是无限的,也就意味着它的线程数不会受限制,那么它就不需要额外的空间来存储那些Task,因为每个任务都可以通过新建线程来处理。SynchronousQueue会直接把任务交给线程,不保存它们,效率更好。
  • ScheduledThreadPool选取的是延迟队列
  • 对于ScneduledThreadPool而言,它使用的是DelayedWorkQueue,延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。选择使用延迟队列的原因是,ScheduledThreadPool处理的是基于时间而执行的Task,而延迟队列有能力把Task按照执行时间的