likes
comments
collection
share

Java并发编程面试4:并发集合-ConcurrentHashMap、ConcurrentLinkedQueue和 CopyOnWriteArrayList

作者站长头像
站长
· 阅读数 35

引言

并发集合:ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList在多线程编程中,数据结构的线程安全性至关重要。Java的并发包java.util.concurrent提供了一系列线程安全的集合类,它们可以在多线程环境中使用而不需要额外的同步措施。本文将介绍三种常用的并发集合:ConcurrentHashMapConcurrentLinkedQueueCopyOnWriteArrayList,以及它们的使用场景和性能特点。

正文

ConcurrentHashMap

ConcurrentHashMap的实现原理

ConcurrentHashMap是Java中提供的一个线程安全的哈希表实现。它是java.util.concurrent包的一部分, 旨在提供比Hashtable和同步的HashMap(通过Collections.synchronizedMap()包装的HashMap)更高的并发性能。 以下是其实现原理的简述:

JDK 1.7及之前版本:

  1. 分段锁(Segment Locking): ConcurrentHashMap将内部数据结构分为多个段(Segment),每个段其实就是一个小的哈希表,拥有自己的锁。当一个线程需要访问一个段时,只需获取那个段的锁,这样其他线程可以同时访问其他段,从而提高并发性。

  2. 哈希表: 每个段内部是一个传统的哈希表,使用数组+链表的方式存储键值对。

JDK 1.8及之后版本:

  1. 取消分段锁: 在JDK 1.8中,ConcurrentHashMap放弃了分段锁,改为使用节点锁和CAS操作(Compare-And-Swap)。

  2. 数组+链表+红黑树: 内部数据结构为一个Node数组,Node是存储键值对的基本单元。当链表长度超过一定阈值时,链表会转换为红黑树,以提高搜索效率。

  3. 并发级别: 由于取消了分段锁,ConcurrentHashMap不再使用concurrency level作为构造参数,而是根据需要动态调整内部结构。

  4. 锁分离机制: 通过将数据结构分为多个桶(Bucket),并使用synchronized关键字或CAS操作来控制对这些桶的并发访问,从而实现更细粒度的锁定策略。

核心代码解读

/** 实现 put 和 putIfAbsent 方法 */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 检查键或值是否为 null,如果为 null 则抛出空指针异常
    if (key == null || value == null) throw new NullPointerException();
    // 计算键的哈希值
    int hash = spread(key.hashCode());
    // 用于记录链表中的节点数量
    int binCount = 0;
    // 无限循环,尝试插入节点
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果表为空或者长度为 0,初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果计算出的桶位置为空,尝试使用 CAS 操作插入新节点
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break; // 当添加到空桶时不需要加锁
        }
        // 如果桶中第一个节点的哈希值为 MOVED,表示节点正在被转移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            // 存储旧值
            V oldVal = null;
            // 锁定桶中的第一个节点
            synchronized (f) {
                // 再次检查节点是否为桶的头节点
                if (tabAt(tab, i) == f) {
                    // 如果是普通节点
                    if (fh >= 0) {
                        binCount = 1;
                        // 遍历链表
			for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果找到具有相同键的节点
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                // 如果不是 putIfAbsent 或者节点已存在,则更新值
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                 // 用于记录前一个节点,以便在链表尾部添加新节点
                            Node<K,V> pred = e;
                            // 如果到达链表尾部,添加新节点
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    // 如果节点是 TreeBin 类型,表示桶中的结构已经转换成红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                   // 在树中插入或更新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            // 如果不是 putIfAbsent 或者节点已存在,则更新值
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 如果链表长度达到阈值,则转换为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 如果旧值存在,返回旧值
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 增加计数器
    addCount(1L, binCount);
    return null;
}

流程图

检查key/value是否为null
key/value不为null
table为空或长度为0
计算的桶位置不为空
是MOVED
不是MOVED
头节点哈希值fh >= 0
找到相同key的节点
未找到相同key的节点
头节点是TreeBin类型
找到相同key的节点
未找到相同key的节点
链表长度达到阈值
链表长度未达到阈值
链表长度未达到阈值
开始
抛出NullPointerException
计算key的哈希值
获取当前表table
初始化table
尝试在计算的桶位置插入节点
检查头节点是否为MOVED
帮助转移节点
锁定头节点进行操作
遍历链表
根据onlyIfAbsent决定是否更新值
在链表尾部添加新节点
在红黑树中处理插入/更新
如果有更新旧值则返回旧值
结束
转换链表为红黑树
增加计数器

ConcurrentHashMap的优点

  • 高并发性能: 提供比Hashtable和同步HashMap更优的并发性能,尤其在多线程环境下读多写少的场景。
  • 线程安全: 提供完全的线程安全而无需外部同步。
  • 无锁读取: 读操作通常不需要加锁,可以直接访问数据。
  • 扩展性: 在JDK 1.8中,通过动态调整内部结构和适应不同的并发情况,提高了扩展性。

ConcurrentHashMap的缺点

  • 内存占用: 由于其内部结构,ConcurrentHashMap通常比普通的HashMap占用更多的内存。
  • 写操作开销: 虽然读操作非常快,但写操作由于涉及锁和可能的数据结构调整,其性能可能低于非线程安全的HashMap
  • 复杂性: 相比于简单的HashMapConcurrentHashMap的实现更复杂,理解其内部工作机制也更有难度。

ConcurrentHashMap使用场景和特点

使用场景:

  • 高并发环境: 在需要多个线程频繁读写共享数据的场景中,如缓存、共享配置数据等。
  • Web服务器: 存储用户会话信息,其中对会话的读取远多于修改。
  • 实时应用: 在实时计算中用于存储中间状态,需要快速响应和更新数据。

使用特点:

  • 线程安全: 提供完整的线程安全支持,无需外部同步。
  • 高并发性能: 通过分离锁(在JDK 1.7及之前)或使用CAS操作(在JDK 1.8及之后),允许多个线程同时读写不同段的数据。
  • 键值对存储: 适用于需要键值对存储结构的场景。
  • 动态扩容: 自动扩容以适应不断增长的数据量。
  • 迭代器弱一致性: 迭代器反映创建时的状态,不会抛出ConcurrentModificationException

测试代码示例

package com.dereksmart.crawling.future;

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * @Author derek_smart
 * @Date 2024/8/7 8:06
 * @Description  ConcurrentHashMap测试类
 */
public class ConcurrentHashMapPerformanceTest {
    private static final int THREAD_COUNT = 8; // 并发线程数量
    private static final int NUMBER_OF_OPERATIONS = 100000; // 每个线程执行的操作数量

    private final ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();
    private final HashMap<Integer, Integer> map1 = new HashMap<>();

    private final CountDownLatch startLatch = new CountDownLatch(1);
    private final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);

    public void testPerformance() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

        // 启动写入任务
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadNum = i;
            executor.submit(() -> {
                try {
                    startLatch.await();
                    for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
                        map.put(threadNum * NUMBER_OF_OPERATIONS + j, j);
                        map1.put(threadNum * NUMBER_OF_OPERATIONS + j, j);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            });
        }

        // 启动读取任务
        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.submit(() -> {
                try {
                    startLatch.await();
                    for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
                        map.get(j);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            });
        }

        long startTime = System.nanoTime();
        startLatch.countDown(); // 启动所有线程
        endLatch.await(); // 等待所有线程完成
        long endTime = System.nanoTime();

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);

        long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
        System.out.println("Total time taken: " + duration + " ms");
        System.out.println("Map size: " + map.size());

        System.out.println("Map1 size: " + map1.size());
    }

    public static void main(String[] args) throws InterruptedException {
        new ConcurrentHashMapPerformanceTest().testPerformance();
    }
}

Java并发编程面试4:并发集合-ConcurrentHashMap、ConcurrentLinkedQueue和 CopyOnWriteArrayList 根据对比可以很好的发现ConcurrentHashMap在多线程中具有现场安全性,保证了数据准确性。

ConcurrentLinkedQueue

ConcurrentLinkedQueue的实现原理

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它实现了Queue接口并使用了非阻塞算法。它在java.util.concurrent包中,是一个适用于高并发场景的队列。以下是其实现原理的简述:

  1. 非阻塞算法: ConcurrentLinkedQueue使用了Michael & Scott (M&S)算法,这是一个无锁的并发算法,它使用CAS(Compare-And-Swap)操作来管理其内部链表的头部和尾部。

  2. 单向链表结构: 队列的每个元素都是链表中的一个节点,每个节点通过next引用指向链表中的下一个节点。

  3. 头尾节点: 队列维护两个指针,分别指向头部和尾部的节点。入队操作在尾部进行,出队操作在头部进行。

  4. CAS操作: 在添加或删除元素时,ConcurrentLinkedQueue使用CAS操作来确保节点的原子性更新,避免了使用锁,这样多个线程可以同时操作队列。

  5. 无锁的迭代器: ConcurrentLinkedQueue提供了弱一致性的迭代器,即迭代器不会反映出迭代器创建后的修改。

代码解读

public boolean offer(E e) {  
    checkNotNull(e); // 检查传入的元素e是否为null,如果是null则抛出异常。  
    final Node<E> newNode = new Node<E>(e); // 创建一个新的节点,其包含要添加的元素e。  
  
    for (Node<E> t = tail, p = t;;) { // 从尾节点开始,使用两个指针t和p。  
        Node<E> q = p.next; // 取得p指针指向的节点的下一个节点。  
        if (q == null) { // 如果q为null,表示p指向的节点是最后一个节点。  
            if (p.casNext(null, newNode)) { // 使用CAS操作尝试将newNode设置为p节点的下一个节点。  
                // 如果CAS操作成功,这个操作就是newNode变成队列一部分的线性化点
                // 也是newNode变得“活跃”的点。  
                if (p != t) // 如果p不等于t,表示在执行过程中,尾节点已经向前移动了。  
                    casTail(t, newNode);  // 尝试使用CAS操作更新尾节点为newNode,失败也没关系。  
                return true// 添加元素成功,返回true。  
            }  
            // 如果CAS操作失败,表示有其他线程在竞争添加元素,需要重新读取p节点的下一个节点。  
        }  
        else if (p == q) { // 如果p等于q,表示我们已经不在列表上了。  
            // 如果尾节点没有变化,它也会不在列表上,这种情况下我们需要跳到头节点,  
            // 因为所有活跃的节点都可以从头节点访问到。否则,新的尾节点是更好的选择。  
            p = (t != (t = tail)) ? t : head;  
        }  
        else {  
            // 如果p不等于t并且尾节点t有更新,那么使用新的尾节点t,否则使用q。  
            p = (p != t && t != (t = tail(linearization point),)) ? t : q;  
        }  
    }  
}  

ConcurrentLinkedQueue的优点

  • 线程安全: 通过使用非阻塞算法,ConcurrentLinkedQueue提供了线程安全的队列操作,无需外部同步。

  • 高并发性能: 由于无锁的设计,多个线程可以并发地进行入队和出队操作,这在高并发场景下提供了很好的性能。

  • 无锁的迭代器: 迭代器的实现是弱一致性的,它可以与其他线程的入队和出队操作并发进行,而不会抛出ConcurrentModificationException

  • 无容量限制: 作为一个无界队列,ConcurrentLinkedQueue不会因为容量限制而阻塞入队操作。

ConcurrentLinkedQueue的缺点

  • 内存占用: 每个元素都作为一个独立的对象存在,与它们的next引用一起,这会导致比数组基础的队列更多的内存开销。

  • 高并发下的性能开销: 虽然无锁设计在大多数情况下效率很高,但在极端的高并发场景下,频繁的CAS操作可能会导致性能瓶颈。

  • 弱一致性迭代器: 迭代器不保证展示队列的实时状态,这在某些需要强一致性迭代的场景中可能不是最佳选择。

  • 难以确定当前大小: 由于队列的动态变化,计算队列的当前大小可能会很耗时,且结果可能在计算完成时已不再准确。

ConcurrentLinkedQueue是为那些需要高并发入队和出队操作的场景设计的,它适用于生产者-消费者、工作窃取和其他并发设计模式。然而,如果对内存的使用有严格的要求,或者需要一个有界队列以限制资源使用,那么可能需要考虑其他类型的队列。

ConcurrentLinkedQueue使用场景和特点

使用场景:

  • 生产者-消费者模型: 在多个线程生产和消费数据的场景中,如消息队列、事件处理等。
  • 任务调度: 存储待处理的任务,如线程池的工作队列。
  • 无阻塞算法: 适用于需要无阻塞算法来提高性能的场景。

使用特点:

  • 无锁设计: 使用CAS操作实现无锁的并发控制,减少了锁竞争。
  • 无界队列: 动态增长,理论上没有容量限制,适用于处理未知数量的元素。
  • FIFO顺序: 保证元素的先进先出顺序。
  • 高并发性能: 在多线程环境中提供良好的性能。
  • 迭代器弱一致性: 迭代器提供了与修改操作无关的弱一致性视图。

测试代码示例

package com.dereksmart.crawling.future;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * @Author derek_smart
 * @Date 2024/8/7 8:18
 * @Description  ConcurrentHashMap测试类
 */
public class ConcurrentLinkedQueuePerformanceTest {
    private static final int THREAD_COUNT = 16; // 并发线程数量
    private static final int NUMBER_OF_OPERATIONS = 500000; // 每个线程执行的操作数量

    private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();

    private final CountDownLatch startLatch = new CountDownLatch(1);
    private final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);

    public void testPerformance() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

        // 启动入队(offer)任务
        for (int i = 0; i < THREAD_COUNT / 2; i++) {
            executor.submit(() -> {
                try {
                    startLatch.await();
                    for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
                        queue.offer(j);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            });
        }

        // 启动出队(poll)任务
        for (int i = 0; i < THREAD_COUNT / 2; i++) {
            executor.submit(() -> {
                try {
                    startLatch.await();
                    for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
                        queue.poll();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            });
        }

        long startTime = System.nanoTime();
        startLatch.countDown(); // 启动所有线程
        endLatch.await(); // 等待所有线程完成


        long endTime = System.nanoTime();

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);

        long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
        System.out.println("Total time taken for " + (THREAD_COUNT / 2 * NUMBER_OF_OPERATIONS) + " operations: " + duration + " ms"+queue.size());

    }

    public static void main(String[] args) throws InterruptedException {
        new ConcurrentLinkedQueuePerformanceTest().testPerformance();
    }
}

CopyOnWriteArrayList

CopyOnWriteArrayList的实现原理

CopyOnWriteArrayList是一个线程安全的ArrayList变体,它通过一种称为写时复制(copy-on-write)的技术来避免并发问题。以下是其实现原理的简述:

  1. 写时复制机制CopyOnWriteArrayList在修改操作(如addsetremove等)时,不直接在当前数组上进行修改。相反,它会先创建当前数组的一个副本,然后在这个副本上执行修改操作,最后将原数组引用指向新的修改后的数组。

  2. 数组存储:所有元素都存储在一个内部的volatile数组中,确保数组内容的可见性。

  3. 锁的使用:在执行复制和修改操作时,会使用一个锁(通常是一个简单的对象锁)来保证操作的原子性。

  4. 迭代器弱一致性CopyOnWriteArrayList的迭代器提供弱一致性,意味着它反映的是创建迭代器时数组的状态,而不是最新的状态。迭代器不会抛出ConcurrentModificationException

CopyOnWriteArrayList的优点

  • 线程安全:提供线程安全的列表操作,无需额外的同步措施。

  • 读操作高效:获取操作(如get)不需要加锁,因为它们操作的是不变的数组副本。

  • 迭代器弱一致性:迭代器不会因为并发修改而失败,适合在遍历时进行写操作的场景。

  • 无锁读取:读取操作不需要锁定,因此在读多写少的场景下性能很好。

CopyOnWriteArrayList的缺点

  • 内存开销:由于写操作需要复制整个数组,所以可能会占用大量内存,尤其是当列表很大时。

  • 写操作成本高:每次写操作都需要复制整个数组,这可能导致大量的系统调用和内存复制操作。

  • 不适合写频繁的场景:如果有大量的写操作,CopyOnWriteArrayList的性能可能会大幅下降。

  • 迭代器一致性问题:虽然迭代器提供弱一致性,但在某些场景下可能需要反映列表的最新状态。

CopyOnWriteArrayList使用场景和特点

使用场景:

  • 读多写少: 在读操作远多于写操作的场景中,如配置管理、访问控制列表等。
  • 事件监听器列表: 存储事件监听器,其中监听器的添加和移除不频繁,但可能在迭代时修改。
  • 实时读取: 需要保证实时读取数据的一致性,而不需要实时反映写入的场景。

使用特点:

  • 写时复制机制: 修改操作通过创建数组副本来实现,保证了写操作的安全性。
  • 线程安全: 提供线程安全的列表操作,无需额外同步。
  • 读操作无锁: 读取操作不需要加锁,提供了很好的读性能。
  • 内存开销较大: 写操作需要复制整个数组,可能导致较大的内存开销。
  • 迭代器弱一致性: 迭代器不会反映出迭代器创建之后的修改。

CopyOnWriteArrayList非常适合读操作远多于写操作的场景,如事件监听器列表、缓存等。然而,如果应用程序需要频繁地修改列表,或者列表元素非常多,那么CopyOnWriteArrayList可能不是一个好的选择,因为每次修改都会导致大量的内存复制,从而影响性能和资源使用。在这种情况下,可以考虑使用其他并发集合,如ConcurrentHashMap的键集合或Collections.synchronizedList等。

测试代码示例

package com.dereksmart.crawling.future;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * @Author derek_smart
 * @Date 2024/8/7 8:39
 * @Description  CopyOnWriteArrayList测试类
 */
public class CopyOnWriteArrayListPerformanceTest {
    private static final int THREAD_COUNT = 16; // 并发线程数量
    private static final int NUMBER_OF_OPERATIONS = 10000; // 每个线程执行的操作数量

    private final CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();

    private final CountDownLatch startLatch = new CountDownLatch(1);
    private final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);

    public void testPerformance() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

        // 启动写入任务
        for (int i = 0; i < THREAD_COUNT / 4; i++) {
            executor.submit(() -> {
                try {
                    startLatch.await();
                    for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
                        list.add(j);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            });
        }

        // 启动读取任务
        for (int i = 0; i < THREAD_COUNT - THREAD_COUNT / 4; i++) {
            executor.submit(() -> {
                try {
                    startLatch.await();
                    for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
                        list.get(j % list.size());
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown();
                }
            });
        }

        long startTime = System.nanoTime();
        startLatch.countDown(); // 启动所有线程
        endLatch.await(); // 等待所有线程完成

        long endTime = System.nanoTime();

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);

        long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
        System.out.println("Total time taken for " + (THREAD_COUNT * NUMBER_OF_OPERATIONS) + " operations: " + duration + " ms");
    }

    public static void main(String[] args) throws InterruptedException {
        new CopyOnWriteArrayListPerformanceTest().testPerformance();
    }
}

结论

这些并发集合的性能因使用场景而异。ConcurrentHashMap的分段锁策略在保持高并发性能的同时,提供了足够的一致性保证。ConcurrentLinkedQueue的非阻塞算法使其在高并发场景下表现出色。而CopyOnWriteArrayList虽然写操作较慢,但在读多写少的场景下,其读操作的性能非常高。

在选择使用哪种并发集合时,应该根据实际的应用需求和预期的工作负载来做出决策。例如,如果预期写操作不频繁,但需要快速、一致的读取操作,CopyOnWriteArrayList可能是一个合适的选择。相反,如果应用需要支持大量的并发写操作,那么ConcurrentHashMapConcurrentLinkedQueue可能更适合。。

转载自:https://juejin.cn/post/7399985328709681178
评论
请登录