likes
comments
collection
share

并发容器之ConcurrentLinkedQueue

作者站长头像
站长
· 阅读数 4

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一种适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,由于ConcurrentLinkedQueue是一种非阻塞的队列,通常ConcurrentLinkedQueue性能好于BlockingQueue。是一种基于链表节点的无界线程安全队列。该队列的元素遵循先进先出FIFO的原则,该队列不允许为null

源码分析

内部的队列使用单向链表的方式实现,新元素会被插入队列末尾,出队时从队列头部获取一个元素,队列进行出队入队时对节点的操作是通过CAS实现的,保证线程安全

// 队首
private transient volatile Node<E> head;
// 队尾
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
  	// 默认头节点、尾结点是Node中为null的哨兵节点
  	// 初始时,head、tail 都指向同一个 item 为 null 的节点
    head = tail = new Node<E>(null);
}

private static class Node<E> {
  // 存放节点的值
  volatile E item;
  // 存放下一个节点
  volatile Node<E> next;
// 内部的操作全部依赖于UNSAFE的CAS操作来实现原子性
  Node(E item) {
    UNSAFE.putObject(this, itemOffset, item);
  }
	// 更改Node中的数据域item
  boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  }
// 更改Node中的指针域next
  void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
  }
// 更改Node中的指针域next
  boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  }

  // Unsafe mechanics

  private static final sun.misc.Unsafe UNSAFE;
  // 偏移量
  private static final long itemOffset;
  // 下一个元素的偏移量
  private static final long nextOffset;
}

重要方法

add()和offer()都是加入元素的方法,add方法内部也是调用的offer方法

offer方法入队
public boolean offer(E e) {
  // 检查是否为null,会抛出空指针
    checkNotNull(e);
  // 构造Node节点
    final Node<E> newNode = new Node<E>(e);
// 从尾结点进行插入,循环直到成功为止
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
      // 如果q为null,说明当前p是尾结点,尝试加入到队尾,如果加入失败,表示其他线程已经修改了p的指向
        if (q == null) { // 初始时,head、tail 都指向同一个 item 为 null 的节点
            // 使用CAS操作设置p节点的next节点,但是没有更新尾结点
          // 如果有多线程操作,会导致第一次CAS操作失败,再次执行for循环
            if (p.casNext(null, newNode)) { // CAS操作成功,新增节点被放入到链表中
                
              	// p!=t,表示有多线程操作,导致第一次cas操作没有成功,此次不是第一次cas操作,此时在进行设置尾结点
                if (p != t) // hop two nodes at a time
                  // 设置当前尾结点为新插入的节点
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q) // 多线程操作时,由于poll操作移除元素后可能会把head变成自引用(环形链表),此时head的next节点也是head,所以需要重新找到新的head
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
          
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
          // 寻找尾结点,找到当前的尾结点之后,再次执行for循环
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

poll()和peek()都是取头元素节点,前者会删除元素,后者不会删除元素

poll方法出队
public E poll() {
    restartFromHead:
    for (;;) {
      // 从头节点开始遍历
        for (Node<E> h = head, p = h, q;;) {
          // 保存当前节点
            E item = p.item;
					// 当前节点有值,并且使用CAS操作将当前节点变为null成功
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
              // CAS操作成功,则标记当前节点从链表中移除
              // 只有多线程操作时,使得第一次p!=h时才会设置头节点为p
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
          // 当前队列为空 
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
          // 多线程同时操作时才会出现该情况,当前节点自引用,需要重新寻找新的队列头节点
            else if (p == q)
                continue restartFromHead;
            else // 多线程操作时,会导致第一次判断时item为null,且此时已经有了新插入的节点了,需要重新指定头节点
                p = q;
        }
    }
}
peek方法
// 与poll方法类似,只是少了cas操作来清空头节点的值
public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
remove方法

如果队列中存在该元素,则删除该元素

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            if (item != null) {
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
              // 使用cas操作来进行remove
                removed = p.casItem(item, null);
            }

            next = succ(p);
          // 如果有前驱节点,并且next不为空,则需要将这两个连接起来
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

head和tail的更新时机

tail 更新时机:tail 节点不总是尾节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则只入队不更新尾节点。

head 更新时机:并不是每次出队时都更新 head 节点,当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点;只有当 head 节点里没有元素时,出队操作才会更新 head 节点。

head 和 tail 的更新总是间隔了一个,是为了减少CAS的更新操作,如果大量的入队操作,每次都要执行 CAS 进行 tail 的更新,汇总起来对性能也会是大大的损耗。如果能减少 CAS 更新的操作,无疑可以大大提升入队的操作效率

zhhll.icu/2021/多线程/并发…

本文由mdnice多平台发布

转载自:https://juejin.cn/post/7349119682002436096
评论
请登录