likes
comments
collection
share

【多线程04】你好,BlockingQueue!

作者站长头像
站长
· 阅读数 38

多线程03文章里,简单的描述了线程池的几个参数,其中提到了一个没有容量的特殊队列SynchronousQueue,其实现了BlockingQueue接口。当时只是简单的提及了一下,接下来就系统的学习一下BlockingQueue。本文主要有以下内容:

  • BlockingQueue接口介绍
  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

java.lang.concurrent 包下,定义了许多和并发相关的容器类,如Concurrent开头的类ConcurrentHashMap、ConcurrentLinkedQueue,亦或是以CopyOnWrite开头的类,如CopyOnWriteArrayList,包括BlockingQueue接口的实现类。这些都是巨佬Doug Lea给我们提供好的一些并发容器,在没有这些容器之前,我们通常需要自己去处理线程同步、异步、资源的获取和释放等各种各样的并发问题。如今只需要学会这些用法即可。本文主要内容为BlockingQueue相关的实现类,其他的并发容器会在后续博客介绍。

BlockingQueue

BlockingQueue是一个阻塞队列,典型的使用场景就是在生产者和消费者模式中,充当一个数据缓冲区,用于存储生产者生产的"数据"或者供消费者"拿取数据"的地方,就相当于一个仓库。队列是一种先进先出的数据结构。

BlockingQueue接口中定义了如下四类方法:

功能\类别Throw exception(抛异常)Special value(特定值)Blocks(阻塞)time out(超时)
insert(插入)add(e)offer(e)put(e)offer(e,time,unit)
remove(删除)remove()poll()take()poll(time,unit)
Examine(检查)element()peek()//

使用BlockingQueue时,不能添加null值,在使用插入相关的api时,如果添加的元素为null,则将抛出空指针异常。null 在BlockingQueue中有特殊的含义,当进行poll()操作时(该方法返回队列的头节点,并删除,如果超过等待的时间则返回null),该方法有可能返回null值,因此如果允许添加null元素,则null的含义将不明确!

BlockingQueue 如果没有设置容量,则容量为 Integer.MAX_VALUE。如果指定了队列的容量则可以通过remainingCapacity()方法获取容量,如果还有空间则可以直接放入,反之则不能。

BlockingQueue的实现类都是线程安全的,所有的方法都通过内部锁或者其他方式保证了线程安全。

BlockingQueue的主要实现类有如下图:

【多线程04】你好,BlockingQueue!

在本文中,关注的重点实现类有:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

接下来将按照顺序将这三个实现类抽丝剥茧!

ArrayBlockingQueue

ArrayBlockingQueue是一个基于数组实现的有界队列,一旦创建将无法改变容量,当容量已满时,往数组中添加元素将会阻塞,当数组为空时,从数组中取出数据将阻塞。

內部是一个Object的数组,维护了takeIndex putIndex count这三个变量用于维护对应的取值、存值和队列中当前元素的个数操作。

  • takeIndex:items index for next take, poll, peek or remove
  • putIndex:items index for next put, offer, or add
  • count:Number of elements in the queue

通过Condition对象notEmptynotFull来进行同步控制!

入队操作

add()offer()put()这三个方法的异同:

相同点:这三个方法都是将一个元素插入队列,

不同点:

add(): 是通过调用父类的add()实现的,插入成功将返回true,插入失败将抛出IllegalStateException或者NullPointerException,前者是队列已满,后者是插入的元素为null时抛出的异常。

    public boolean add(E e) {
        return super.add(e);
    }

offer():通过enqueue()方法将元素插入队列中,队列已满将插入失败这样会返回false,如果元素为 null 也会抛出NullPointerException,该方法优于add()

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

put():从方法的实现就可以看出,如果队列已满,方法将阻塞,直到被唤醒将元素插入后,才会释放锁资源。如果元素为null将抛出NullPointerException,或者在获取锁资源的过程中抛出InterruptedException

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

enqueue():私有方法,将元素插入到队列中,putIndex+1,count+1。

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

出队操作

take()poll(time,unit)都来自于BlockingQueue接口,均会从队列中删除返回的头节点,前者会一直阻塞到节点可以,后者在超过等待时间后会返回null,这两个方法如果在等待过程中被中断则都会抛出InterruptedException。在该实现类中,出队操作由dequeue()实现!

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表的"可选"有界的阻塞队列!为什么是可选的有界队列是因为当我们没有指定容量而进行创建时,默认的容量是Integer.MAX_VALUE,而当我们创建时指定了容量,则队列的容量就会固定,不会动态扩容!其构造函数如下:

   public LinkedBlockingQueue() {
       this(Integer.MAX_VALUE);
   }

   /**
   * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
   *
   * @param capacity the capacity of this queue
   * @throws IllegalArgumentException if {@code capacity} is not greater
   *         than zero
   */
   public LinkedBlockingQueue(int capacity) {
       if (capacity <= 0) throw new IllegalArgumentException();
       this.capacity = capacity;
       last = head = new Node<E>(null);
   }

LinkedBlockingQueue内部封装了一个节点类,用于存储我们的数据,其next属性指向下一个节点。

static class Node<E> {
    E item;
    /**
    * One of:
    * - the real successor Node
    * - this Node, meaning the successor is head.next
    * - null, meaning there is no successor (this is the last node)
    */
    
    Node<E> next;
    Node(E x) { item = x; }
}

入队和出队操作

LinkedBlockingQueue的出入队列的操作和ArrayBlockingQueue的操作类似,不进行过多的描述,只需要关注enqueue()dequeue()方法即可

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

出队过程如下所示:

【多线程04】你好,BlockingQueue!

SynchronousQueue

SynchronousQueue是最特殊的一个实现类,其内部没有任何容量,每一次插入操作都必须等待一个remove操作,必须成对出现,否则将会一直阻塞,其内部支持公平锁和非公平锁,默认是非公平锁!公平锁的策略是FIFO(First In First Out)

SynchronousQueue类似于我们在买快递退货时,快递小哥必须来了之后才把货物交给快递小哥。一个萝卜一个坑!!

    public static void main(String[] args) {
        SynchronousQueue<String> synchronousQueue = new SynchronousQueue();
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            try {
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName() + " 1 入列");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName() + " 2 入列");
                synchronousQueue.put("3");
                System.out.println(Thread.currentThread().getName() + " 3 入列");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.execute(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 出列 = " + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " 出列 = " + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " 出列 = " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

其运行结果如下图所示:

【多线程04】你好,BlockingQueue!

可以看到入列和出列必须成对出现才能往下进行。

掌握了BlockingQueue的几种实现类,回顾Executors创建线程池的方式,就能够更好的了解BlockingQueue实现类使用的场景! 如以下几种创建线程池的方式!


public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}


public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
  

DelayQueue 和 PriorityBlockingQueue

延迟队列和优先级队列的方法和上述几个实现类都差不了太多,无非是加一个时间参数或者通过Comparator实现对象的比较,这两个放在后面再详细去分析吧!好东西要慢慢品尝!

参考资料

  • JDK源码
转载自:https://juejin.cn/post/7242676843988156471
评论
请登录