Java并发编程面试4:并发集合-ConcurrentHashMap、ConcurrentLinkedQueue和 CopyOnWriteArrayList
引言
并发集合:ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList在多线程编程中,数据结构的线程安全性至关重要。Java的并发包java.util.concurrent
提供了一系列线程安全的集合类,它们可以在多线程环境中使用而不需要额外的同步措施。本文将介绍三种常用的并发集合:ConcurrentHashMap
、ConcurrentLinkedQueue
和CopyOnWriteArrayList
,以及它们的使用场景和性能特点。
正文
ConcurrentHashMap
ConcurrentHashMap的实现原理
ConcurrentHashMap
是Java中提供的一个线程安全的哈希表实现。它是java.util.concurrent包的一部分,
旨在提供比Hashtable
和同步的HashMap
(通过Collections.synchronizedMap()
包装的HashMap
)更高的并发性能。
以下是其实现原理的简述:
JDK 1.7及之前版本:
-
分段锁(Segment Locking):
ConcurrentHashMap
将内部数据结构分为多个段(Segment),每个段其实就是一个小的哈希表,拥有自己的锁。当一个线程需要访问一个段时,只需获取那个段的锁,这样其他线程可以同时访问其他段,从而提高并发性。 -
哈希表: 每个段内部是一个传统的哈希表,使用数组+链表的方式存储键值对。
JDK 1.8及之后版本:
-
取消分段锁: 在JDK 1.8中,
ConcurrentHashMap
放弃了分段锁,改为使用节点锁和CAS操作(Compare-And-Swap)。 -
数组+链表+红黑树: 内部数据结构为一个Node数组,Node是存储键值对的基本单元。当链表长度超过一定阈值时,链表会转换为红黑树,以提高搜索效率。
-
并发级别: 由于取消了分段锁,
ConcurrentHashMap
不再使用concurrency level作为构造参数,而是根据需要动态调整内部结构。 -
锁分离机制: 通过将数据结构分为多个桶(Bucket),并使用synchronized关键字或CAS操作来控制对这些桶的并发访问,从而实现更细粒度的锁定策略。
核心代码解读
/** 实现 put 和 putIfAbsent 方法 */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 检查键或值是否为 null,如果为 null 则抛出空指针异常
if (key == null || value == null) throw new NullPointerException();
// 计算键的哈希值
int hash = spread(key.hashCode());
// 用于记录链表中的节点数量
int binCount = 0;
// 无限循环,尝试插入节点
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果表为空或者长度为 0,初始化表
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果计算出的桶位置为空,尝试使用 CAS 操作插入新节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // 当添加到空桶时不需要加锁
}
// 如果桶中第一个节点的哈希值为 MOVED,表示节点正在被转移
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// 存储旧值
V oldVal = null;
// 锁定桶中的第一个节点
synchronized (f) {
// 再次检查节点是否为桶的头节点
if (tabAt(tab, i) == f) {
// 如果是普通节点
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果找到具有相同键的节点
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 如果不是 putIfAbsent 或者节点已存在,则更新值
if (!onlyIfAbsent)
e.val = value;
break;
}
// 用于记录前一个节点,以便在链表尾部添加新节点
Node<K,V> pred = e;
// 如果到达链表尾部,添加新节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 如果节点是 TreeBin 类型,表示桶中的结构已经转换成红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 在树中插入或更新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
// 如果不是 putIfAbsent 或者节点已存在,则更新值
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果链表长度达到阈值,则转换为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果旧值存在,返回旧值
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加计数器
addCount(1L, binCount);
return null;
}
流程图
ConcurrentHashMap的优点
- 高并发性能: 提供比
Hashtable
和同步HashMap
更优的并发性能,尤其在多线程环境下读多写少的场景。 - 线程安全: 提供完全的线程安全而无需外部同步。
- 无锁读取: 读操作通常不需要加锁,可以直接访问数据。
- 扩展性: 在JDK 1.8中,通过动态调整内部结构和适应不同的并发情况,提高了扩展性。
ConcurrentHashMap的缺点
- 内存占用: 由于其内部结构,
ConcurrentHashMap
通常比普通的HashMap
占用更多的内存。 - 写操作开销: 虽然读操作非常快,但写操作由于涉及锁和可能的数据结构调整,其性能可能低于非线程安全的
HashMap
。 - 复杂性: 相比于简单的
HashMap
,ConcurrentHashMap
的实现更复杂,理解其内部工作机制也更有难度。
ConcurrentHashMap使用场景和特点
使用场景:
- 高并发环境: 在需要多个线程频繁读写共享数据的场景中,如缓存、共享配置数据等。
- Web服务器: 存储用户会话信息,其中对会话的读取远多于修改。
- 实时应用: 在实时计算中用于存储中间状态,需要快速响应和更新数据。
使用特点:
- 线程安全: 提供完整的线程安全支持,无需外部同步。
- 高并发性能: 通过分离锁(在JDK 1.7及之前)或使用CAS操作(在JDK 1.8及之后),允许多个线程同时读写不同段的数据。
- 键值对存储: 适用于需要键值对存储结构的场景。
- 动态扩容: 自动扩容以适应不断增长的数据量。
- 迭代器弱一致性: 迭代器反映创建时的状态,不会抛出
ConcurrentModificationException
。
测试代码示例
package com.dereksmart.crawling.future;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author derek_smart
* @Date 2024/8/7 8:06
* @Description ConcurrentHashMap测试类
*/
public class ConcurrentHashMapPerformanceTest {
private static final int THREAD_COUNT = 8; // 并发线程数量
private static final int NUMBER_OF_OPERATIONS = 100000; // 每个线程执行的操作数量
private final ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();
private final HashMap<Integer, Integer> map1 = new HashMap<>();
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
public void testPerformance() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
// 启动写入任务
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadNum = i;
executor.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
map.put(threadNum * NUMBER_OF_OPERATIONS + j, j);
map1.put(threadNum * NUMBER_OF_OPERATIONS + j, j);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
}
// 启动读取任务
for (int i = 0; i < THREAD_COUNT; i++) {
executor.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
map.get(j);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
}
long startTime = System.nanoTime();
startLatch.countDown(); // 启动所有线程
endLatch.await(); // 等待所有线程完成
long endTime = System.nanoTime();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
System.out.println("Total time taken: " + duration + " ms");
System.out.println("Map size: " + map.size());
System.out.println("Map1 size: " + map1.size());
}
public static void main(String[] args) throws InterruptedException {
new ConcurrentHashMapPerformanceTest().testPerformance();
}
}
根据对比可以很好的发现ConcurrentHashMap在多线程中具有现场安全性,保证了数据准确性。
ConcurrentLinkedQueue
ConcurrentLinkedQueue的实现原理
ConcurrentLinkedQueue
是一个基于链接节点的无界线程安全队列,它实现了Queue
接口并使用了非阻塞算法。它在java.util.concurrent包中,是一个适用于高并发场景的队列。以下是其实现原理的简述:
-
非阻塞算法:
ConcurrentLinkedQueue
使用了Michael & Scott (M&S)算法,这是一个无锁的并发算法,它使用CAS(Compare-And-Swap)操作来管理其内部链表的头部和尾部。 -
单向链表结构: 队列的每个元素都是链表中的一个节点,每个节点通过
next
引用指向链表中的下一个节点。 -
头尾节点: 队列维护两个指针,分别指向头部和尾部的节点。入队操作在尾部进行,出队操作在头部进行。
-
CAS操作: 在添加或删除元素时,
ConcurrentLinkedQueue
使用CAS操作来确保节点的原子性更新,避免了使用锁,这样多个线程可以同时操作队列。 -
无锁的迭代器:
ConcurrentLinkedQueue
提供了弱一致性的迭代器,即迭代器不会反映出迭代器创建后的修改。
代码解读
public boolean offer(E e) {
checkNotNull(e); // 检查传入的元素e是否为null,如果是null则抛出异常。
final Node<E> newNode = new Node<E>(e); // 创建一个新的节点,其包含要添加的元素e。
for (Node<E> t = tail, p = t;;) { // 从尾节点开始,使用两个指针t和p。
Node<E> q = p.next; // 取得p指针指向的节点的下一个节点。
if (q == null) { // 如果q为null,表示p指向的节点是最后一个节点。
if (p.casNext(null, newNode)) { // 使用CAS操作尝试将newNode设置为p节点的下一个节点。
// 如果CAS操作成功,这个操作就是newNode变成队列一部分的线性化点
// 也是newNode变得“活跃”的点。
if (p != t) // 如果p不等于t,表示在执行过程中,尾节点已经向前移动了。
casTail(t, newNode); // 尝试使用CAS操作更新尾节点为newNode,失败也没关系。
return true; // 添加元素成功,返回true。
}
// 如果CAS操作失败,表示有其他线程在竞争添加元素,需要重新读取p节点的下一个节点。
}
else if (p == q) { // 如果p等于q,表示我们已经不在列表上了。
// 如果尾节点没有变化,它也会不在列表上,这种情况下我们需要跳到头节点,
// 因为所有活跃的节点都可以从头节点访问到。否则,新的尾节点是更好的选择。
p = (t != (t = tail)) ? t : head;
}
else {
// 如果p不等于t并且尾节点t有更新,那么使用新的尾节点t,否则使用q。
p = (p != t && t != (t = tail(linearization point),)) ? t : q;
}
}
}
ConcurrentLinkedQueue的优点
-
线程安全: 通过使用非阻塞算法,
ConcurrentLinkedQueue
提供了线程安全的队列操作,无需外部同步。 -
高并发性能: 由于无锁的设计,多个线程可以并发地进行入队和出队操作,这在高并发场景下提供了很好的性能。
-
无锁的迭代器: 迭代器的实现是弱一致性的,它可以与其他线程的入队和出队操作并发进行,而不会抛出
ConcurrentModificationException
。 -
无容量限制: 作为一个无界队列,
ConcurrentLinkedQueue
不会因为容量限制而阻塞入队操作。
ConcurrentLinkedQueue的缺点
-
内存占用: 每个元素都作为一个独立的对象存在,与它们的
next
引用一起,这会导致比数组基础的队列更多的内存开销。 -
高并发下的性能开销: 虽然无锁设计在大多数情况下效率很高,但在极端的高并发场景下,频繁的CAS操作可能会导致性能瓶颈。
-
弱一致性迭代器: 迭代器不保证展示队列的实时状态,这在某些需要强一致性迭代的场景中可能不是最佳选择。
-
难以确定当前大小: 由于队列的动态变化,计算队列的当前大小可能会很耗时,且结果可能在计算完成时已不再准确。
ConcurrentLinkedQueue
是为那些需要高并发入队和出队操作的场景设计的,它适用于生产者-消费者、工作窃取和其他并发设计模式。然而,如果对内存的使用有严格的要求,或者需要一个有界队列以限制资源使用,那么可能需要考虑其他类型的队列。
ConcurrentLinkedQueue使用场景和特点
使用场景:
- 生产者-消费者模型: 在多个线程生产和消费数据的场景中,如消息队列、事件处理等。
- 任务调度: 存储待处理的任务,如线程池的工作队列。
- 无阻塞算法: 适用于需要无阻塞算法来提高性能的场景。
使用特点:
- 无锁设计: 使用CAS操作实现无锁的并发控制,减少了锁竞争。
- 无界队列: 动态增长,理论上没有容量限制,适用于处理未知数量的元素。
- FIFO顺序: 保证元素的先进先出顺序。
- 高并发性能: 在多线程环境中提供良好的性能。
- 迭代器弱一致性: 迭代器提供了与修改操作无关的弱一致性视图。
测试代码示例
package com.dereksmart.crawling.future;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author derek_smart
* @Date 2024/8/7 8:18
* @Description ConcurrentHashMap测试类
*/
public class ConcurrentLinkedQueuePerformanceTest {
private static final int THREAD_COUNT = 16; // 并发线程数量
private static final int NUMBER_OF_OPERATIONS = 500000; // 每个线程执行的操作数量
private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
public void testPerformance() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
// 启动入队(offer)任务
for (int i = 0; i < THREAD_COUNT / 2; i++) {
executor.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
queue.offer(j);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
}
// 启动出队(poll)任务
for (int i = 0; i < THREAD_COUNT / 2; i++) {
executor.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
queue.poll();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
}
long startTime = System.nanoTime();
startLatch.countDown(); // 启动所有线程
endLatch.await(); // 等待所有线程完成
long endTime = System.nanoTime();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
System.out.println("Total time taken for " + (THREAD_COUNT / 2 * NUMBER_OF_OPERATIONS) + " operations: " + duration + " ms"+queue.size());
}
public static void main(String[] args) throws InterruptedException {
new ConcurrentLinkedQueuePerformanceTest().testPerformance();
}
}
CopyOnWriteArrayList
CopyOnWriteArrayList的实现原理
CopyOnWriteArrayList
是一个线程安全的ArrayList
变体,它通过一种称为写时复制(copy-on-write)的技术来避免并发问题。以下是其实现原理的简述:
-
写时复制机制:
CopyOnWriteArrayList
在修改操作(如add
、set
、remove
等)时,不直接在当前数组上进行修改。相反,它会先创建当前数组的一个副本,然后在这个副本上执行修改操作,最后将原数组引用指向新的修改后的数组。 -
数组存储:所有元素都存储在一个内部的
volatile
数组中,确保数组内容的可见性。 -
锁的使用:在执行复制和修改操作时,会使用一个锁(通常是一个简单的对象锁)来保证操作的原子性。
-
迭代器弱一致性:
CopyOnWriteArrayList
的迭代器提供弱一致性,意味着它反映的是创建迭代器时数组的状态,而不是最新的状态。迭代器不会抛出ConcurrentModificationException
。
CopyOnWriteArrayList的优点
-
线程安全:提供线程安全的列表操作,无需额外的同步措施。
-
读操作高效:获取操作(如
get
)不需要加锁,因为它们操作的是不变的数组副本。 -
迭代器弱一致性:迭代器不会因为并发修改而失败,适合在遍历时进行写操作的场景。
-
无锁读取:读取操作不需要锁定,因此在读多写少的场景下性能很好。
CopyOnWriteArrayList的缺点
-
内存开销:由于写操作需要复制整个数组,所以可能会占用大量内存,尤其是当列表很大时。
-
写操作成本高:每次写操作都需要复制整个数组,这可能导致大量的系统调用和内存复制操作。
-
不适合写频繁的场景:如果有大量的写操作,
CopyOnWriteArrayList
的性能可能会大幅下降。 -
迭代器一致性问题:虽然迭代器提供弱一致性,但在某些场景下可能需要反映列表的最新状态。
CopyOnWriteArrayList使用场景和特点
使用场景:
- 读多写少: 在读操作远多于写操作的场景中,如配置管理、访问控制列表等。
- 事件监听器列表: 存储事件监听器,其中监听器的添加和移除不频繁,但可能在迭代时修改。
- 实时读取: 需要保证实时读取数据的一致性,而不需要实时反映写入的场景。
使用特点:
- 写时复制机制: 修改操作通过创建数组副本来实现,保证了写操作的安全性。
- 线程安全: 提供线程安全的列表操作,无需额外同步。
- 读操作无锁: 读取操作不需要加锁,提供了很好的读性能。
- 内存开销较大: 写操作需要复制整个数组,可能导致较大的内存开销。
- 迭代器弱一致性: 迭代器不会反映出迭代器创建之后的修改。
CopyOnWriteArrayList
非常适合读操作远多于写操作的场景,如事件监听器列表、缓存等。然而,如果应用程序需要频繁地修改列表,或者列表元素非常多,那么CopyOnWriteArrayList
可能不是一个好的选择,因为每次修改都会导致大量的内存复制,从而影响性能和资源使用。在这种情况下,可以考虑使用其他并发集合,如ConcurrentHashMap
的键集合或Collections.synchronizedList
等。
测试代码示例
package com.dereksmart.crawling.future;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author derek_smart
* @Date 2024/8/7 8:39
* @Description CopyOnWriteArrayList测试类
*/
public class CopyOnWriteArrayListPerformanceTest {
private static final int THREAD_COUNT = 16; // 并发线程数量
private static final int NUMBER_OF_OPERATIONS = 10000; // 每个线程执行的操作数量
private final CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
public void testPerformance() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
// 启动写入任务
for (int i = 0; i < THREAD_COUNT / 4; i++) {
executor.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
list.add(j);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
}
// 启动读取任务
for (int i = 0; i < THREAD_COUNT - THREAD_COUNT / 4; i++) {
executor.submit(() -> {
try {
startLatch.await();
for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
list.get(j % list.size());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
});
}
long startTime = System.nanoTime();
startLatch.countDown(); // 启动所有线程
endLatch.await(); // 等待所有线程完成
long endTime = System.nanoTime();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
long duration = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
System.out.println("Total time taken for " + (THREAD_COUNT * NUMBER_OF_OPERATIONS) + " operations: " + duration + " ms");
}
public static void main(String[] args) throws InterruptedException {
new CopyOnWriteArrayListPerformanceTest().testPerformance();
}
}
结论
这些并发集合的性能因使用场景而异。ConcurrentHashMap
的分段锁策略在保持高并发性能的同时,提供了足够的一致性保证。ConcurrentLinkedQueue
的非阻塞算法使其在高并发场景下表现出色。而CopyOnWriteArrayList
虽然写操作较慢,但在读多写少的场景下,其读操作的性能非常高。
在选择使用哪种并发集合时,应该根据实际的应用需求和预期的工作负载来做出决策。例如,如果预期写操作不频繁,但需要快速、一致的读取操作,CopyOnWriteArrayList
可能是一个合适的选择。相反,如果应用需要支持大量的并发写操作,那么ConcurrentHashMap
或ConcurrentLinkedQueue
可能更适合。。
转载自:https://juejin.cn/post/7399985328709681178