【多线程04】你好,BlockingQueue!
在多线程03文章里,简单的描述了线程池的几个参数,其中提到了一个没有容量的特殊队列SynchronousQueue
,其实现了BlockingQueue
接口。当时只是简单的提及了一下,接下来就系统的学习一下BlockingQueue
。本文主要有以下内容:
- BlockingQueue接口介绍
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
在java.lang.concurrent
包下,定义了许多和并发相关的容器类,如Concurrent
开头的类ConcurrentHashMap、ConcurrentLinkedQueue
,亦或是以CopyOnWrite
开头的类,如CopyOnWriteArrayList
,包括BlockingQueue
接口的实现类。这些都是巨佬Doug Lea
给我们提供好的一些并发容器,在没有这些容器之前,我们通常需要自己去处理线程同步、异步、资源的获取和释放等各种各样的并发问题。如今只需要学会这些用法即可。本文主要内容为BlockingQueue
相关的实现类,其他的并发容器会在后续博客介绍。
BlockingQueue
BlockingQueue是一个阻塞队列,典型的使用场景就是在生产者和消费者模式中,充当一个数据缓冲区,用于存储生产者生产的"数据"或者供消费者"拿取数据"的地方,就相当于一个仓库。队列是一种先进先出的数据结构。
BlockingQueue接口中定义了如下四类方法:
功能\类别 | Throw exception(抛异常) | Special value(特定值) | Blocks(阻塞) | time out(超时) |
---|---|---|---|---|
insert(插入) | add(e) | offer(e) | put(e) | offer(e,time,unit) |
remove(删除) | remove() | poll() | take() | poll(time,unit) |
Examine(检查) | element() | peek() | / | / |
使用BlockingQueue时,不能添加null值,在使用插入相关的api时,如果添加的元素为null,则将抛出空指针异常。null 在BlockingQueue中有特殊的含义,当进行poll()
操作时(该方法返回队列的头节点,并删除,如果超过等待的时间则返回null),该方法有可能返回null值,因此如果允许添加null元素,则null的含义将不明确!
BlockingQueue 如果没有设置容量,则容量为 Integer.MAX_VALUE。如果指定了队列的容量则可以通过remainingCapacity()
方法获取容量,如果还有空间则可以直接放入,反之则不能。
BlockingQueue的实现类都是线程安全的,所有的方法都通过内部锁或者其他方式保证了线程安全。
BlockingQueue的主要实现类有如下图:
在本文中,关注的重点实现类有:
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
接下来将按照顺序将这三个实现类抽丝剥茧!
ArrayBlockingQueue
ArrayBlockingQueue是一个基于数组实现的有界队列,一旦创建将无法改变容量,当容量已满时,往数组中添加元素将会阻塞,当数组为空时,从数组中取出数据将阻塞。
內部是一个Object的数组,维护了takeIndex putIndex count这三个变量用于维护对应的取值、存值和队列中当前元素的个数操作。
- takeIndex:items index for next take, poll, peek or remove
- putIndex:items index for next put, offer, or add
- count:Number of elements in the queue
通过Condition对象notEmpty
、notFull
来进行同步控制!
入队操作
add()
、offer()
、put()
这三个方法的异同:
相同点:这三个方法都是将一个元素插入队列,
不同点:
add()
: 是通过调用父类的add()
实现的,插入成功将返回true
,插入失败将抛出IllegalStateException
或者NullPointerException
,前者是队列已满,后者是插入的元素为null时抛出的异常。
public boolean add(E e) {
return super.add(e);
}
offer()
:通过enqueue()
方法将元素插入队列中,队列已满将插入失败这样会返回false
,如果元素为 null
也会抛出NullPointerException
,该方法优于add()
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
put()
:从方法的实现就可以看出,如果队列已满,方法将阻塞,直到被唤醒将元素插入后,才会释放锁资源。如果元素为null将抛出NullPointerException
,或者在获取锁资源的过程中抛出InterruptedException
。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
enqueue()
:私有方法,将元素插入到队列中,putIndex+1,count+1。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
出队操作
take()
和poll(time,unit)
都来自于BlockingQueue接口,均会从队列中删除返回的头节点,前者会一直阻塞到节点可以,后者在超过等待时间后会返回null,这两个方法如果在等待过程中被中断则都会抛出InterruptedException
。在该实现类中,出队操作由dequeue()
实现!
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表的"可选"有界的阻塞队列!为什么是可选的有界队列是因为当我们没有指定容量而进行创建时,默认的容量是Integer.MAX_VALUE,而当我们创建时指定了容量,则队列的容量就会固定,不会动态扩容!其构造函数如下:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
LinkedBlockingQueue内部封装了一个节点类,用于存储我们的数据,其next属性指向下一个节点。
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
入队和出队操作
LinkedBlockingQueue的出入队列的操作和ArrayBlockingQueue的操作类似,不进行过多的描述,只需要关注enqueue()
和dequeue()
方法即可
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
出队过程如下所示:
SynchronousQueue
SynchronousQueue是最特殊的一个实现类,其内部没有任何容量,每一次插入操作都必须等待一个remove操作,必须成对出现,否则将会一直阻塞,其内部支持公平锁和非公平锁,默认是非公平锁!公平锁的策略是FIFO(First In First Out)
SynchronousQueue类似于我们在买快递退货时,快递小哥必须来了之后才把货物交给快递小哥。一个萝卜一个坑!!
public static void main(String[] args) {
SynchronousQueue<String> synchronousQueue = new SynchronousQueue();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
try {
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + " 1 入列");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + " 2 入列");
synchronousQueue.put("3");
System.out.println(Thread.currentThread().getName() + " 3 入列");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 出列 = " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " 出列 = " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " 出列 = " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
其运行结果如下图所示:
可以看到入列和出列必须成对出现才能往下进行。
掌握了BlockingQueue的几种实现类,回顾Executors创建线程池的方式,就能够更好的了解BlockingQueue实现类使用的场景! 如以下几种创建线程池的方式!
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
DelayQueue 和 PriorityBlockingQueue
延迟队列和优先级队列的方法和上述几个实现类都差不了太多,无非是加一个时间参数或者通过Comparator
实现对象的比较,这两个放在后面再详细去分析吧!好东西要慢慢品尝!
参考资料
- JDK源码
转载自:https://juejin.cn/post/7242676843988156471