真实案例:多线程并发之Remove引发的通宵探讨Java多线程并发环境下的锁移除问题。详细介绍了如何通过引用计数安全地管
背景介绍
前一段时间项目出现,系统运行一段时间后,给定线程池100 个,总是只剩下2个线程,导致现场数据根本处理不过来。最后是忙了通宵进行一个个问题排查和代码分析,才找到对应问题发生点。
在一个多线程的环境中,当不同的线程需要访问共享资源时,为了维护数据的一致性和完整性,需要实现同步机制。Java提供了多种同步控制工具,如synchronized关键字、显式锁(如ReentrantLock
)等。在某些场景下,需要为每个唯一的资源创建一个锁对象,这样可以确保资源被安全地并发访问。然而,随着锁对象数量的增加,内存使用也会随之增加。因此,如何在确保线程安全的同时优化内存使用,成为了一个值得探讨的问题。
正文
根据真实项目业务需求,为了避免泄露公司代码,故此这边自我实现一套类似项目中使用的技术及出现问题的代码。
其代码如下所示:
package com.dereksmart.crawling.lock;
import org.springframework.util.StringUtils;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Author derek_smart
* @Date 2024/9/4 8:18
* @Description 多线程测试
*/
public class MultiThreadedTest {
private final Map<String, Lock> LOCK_MAP = new ConcurrentHashMap<>();
public int sendRobotTask(Apple apple) {
int a = 0;
try {
lock(apple);
Thread.sleep(200);
a = doSome(apple);
} catch (Exception e) {
} finally {
unLock(apple);
return a;
}
}
/**
* 释放锁
*
* @param demand
*/
private void lock(Apple demand) {
String lockKey = genLockKey(demand);
if (StringUtils.isEmpty(lockKey)) return;
Lock lock = LOCK_MAP.computeIfAbsent(lockKey, k -> new ReentrantLock());
lock.lock();
}
private String genLockKey(Apple demand) {
return demand.getA() + demand.getB();
}
/**
* 解锁
*
* @param apple
*/
private void unLock(Apple apple) {
String lockKey = genLockKey(apple);
if (StringUtils.isEmpty(lockKey)) return;
Lock lock = LOCK_MAP.get(lockKey);
if (Objects.isNull(lock)) return;
lock.unlock();
LOCK_MAP.remove(lockKey);
}
private int doSome(Apple demand) {
return 1;
}
class Apple implements Serializable {
private String a;
private String b;
public String getA() {
return a;
}
public void setA(String a) {
this.a = a;
}
public String getB() {
return b;
}
public void setB(String b) {
this.b = b;
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(100);
// 创建MultiThreadedTest实例
MultiThreadedTest multiThreadedTest = new MultiThreadedTest();
for (int i = 0; i < 9000; i++) {
// 创建Apple实例
MultiThreadedTest.Apple apple = multiThreadedTest.new Apple();
apple.setA("A" + i);
apple.setB("B" + i);
// 将任务提交给线程池
executorService.submit(() -> {
// 获取当前线程的ID和名称
Thread currentThread = Thread.currentThread();
long threadId = currentThread.getId();
String threadName = currentThread.getName();
// 打印线程ID和名称
System.out.println("线程ID: " + threadId + ", 线程名称: " + threadName);
int result = multiThreadedTest.sendRobotTask(apple);
// 处理结果...
});
}
// 关闭线程池
executorService.shutdown();
try {
// 等待所有任务完成,或者超时,或者当前线程被中断
if (executorService.awaitTermination(1, TimeUnit.HOURS)) {
// 所有任务完成后,打印线程池状态
ThreadPoolExecutor pool = (ThreadPoolExecutor) executorService;
// 获取活跃线程数
int activeCount = pool.getActiveCount();
System.out.println("当前活跃线程数: " + activeCount);
// 获取已完成任务数
long completedTaskCount = pool.getCompletedTaskCount();
System.out.println("已完成任务数: " + completedTaskCount);
// 获取线程池中当前的线程数
int poolSize = pool.getPoolSize();
System.out.println("线程池的线程数: " + poolSize);
// 获取当前线程池中最大允许的线程数
int maximumPoolSize = pool.getMaximumPoolSize();
System.out.println("线程池最大容量: " + maximumPoolSize);
// 获取线程池的核心线程数
int corePoolSize = pool.getCorePoolSize();
System.out.println("线程池的核心线程数: " + corePoolSize);
} else {
System.out.println("等待任务完成超时");
}
} catch (InterruptedException e) {
// 当前线程被中断,尝试立即关闭线程池
executorService.shutdownNow();
System.out.println("等待任务完成时被中断");
// 保留中断状态
Thread.currentThread().interrupt();
}
}
}
根据以上示例代码所示,通过模拟doSome模拟真实业务逻辑。最后就只剩下两个线程在处理这个doSome.
分析代码发现 问题出现在于解锁时候**LOCK_MAP.remove(lockKey);**出现了问题。
具体分析
MultiThreadedTest
类的sendRobotTask
方法中使用了一个ConcurrentHashMap
来存储锁对象,并且在lock
和unLock
方法中分别进行了锁定和解锁操作。然而,存在一个逻辑问题,可能会导致线程被永久阻塞(锁住)的情况。问题的关键在于unLock
方法中如何处理锁的释放和移除。这里是一个可能的问题场景:
1. 假设有两个线程几乎同时调用sendRobotTask
方法,并且传入具有相同lockKey
的Apple
对象。
2. 第一个线程进入lock
方法,获取锁并执行任务。
3. 第二个线程进入lock
方法,由于锁已被第一个线程持有,它将被阻塞,等待锁被释放。
4. 第一个线程完成任务,进入finally
块,调用unLock
方法来释放锁并从LOCK_MAP
中移除该锁对象。
5. 在第一个线程移除锁对象之后,第二个线程仍然在等待这个锁,但是当它最终获取到锁并尝试解锁时,由于锁对象已经从映射中被移除,它将无法解锁,导致线程被永久阻塞。
此外,unLock
方法中直接移除锁的操作是不安全的,因为它没有确保没有其他线程正在等待或持有该锁。如果锁被移除,任何后续尝试获取该锁的线程都将得到一个新的锁实例,这会破坏锁的一致性。
时序图分析
在这个流程图中,Thread1
和Thread2
代表两个并发执行的线程,LockMap
代表ConcurrentHashMap
,而LockObject
代表由LockMap
返回的Lock
对象。流程图展示了两个线程如何竞争同一个锁对象,并且Thread1
最终解锁并从映射中移除锁对象,而Thread2
在尝试解锁时失败,因为锁对象已经不存在。
关键问题在于,当一个线程从LockMap
中移除一个锁对象时,任何等待该锁的其他线程在获取锁之后将无法找到它来执行解锁操作。这就是导致无法解锁的关键所在。
解决方案
为了解决这个问题,需要确保锁的移除操作是安全的。一个简单的解决方案是在释放锁之后不移除它,因为ConcurrentHashMap
可以很好地处理大量的键值对。如果确实需要移除锁对象以节省内存,需要实现一个更复杂的引用计数机制或其他同步策略来确保安全地移除锁对象。
进一步解决-移除锁对象
private void locks(Apple demand) {
String lockKey = genLockKey(demand);
if (lockKey.isEmpty()) return;
LOCK_MAPS.compute(lockKey, (key, lockAndCount) -> {
if (lockAndCount == null) {
lockAndCount = new LockAndCount(new ReentrantLock(), new AtomicInteger(0));
}
lockAndCount.count.incrementAndGet();
return lockAndCount;
}).lock.lock();
}
private void unLocks(Apple apple) {
String lockKey = genLockKey(apple);
if (lockKey.isEmpty()) return;
LockAndCount lockAndCount = (LockAndCount) LOCK_MAP.get(lockKey);
if (lockAndCount == null) return;
lockAndCount.lock.unlock();
LOCK_MAPS.computeIfPresent(lockKey, (key, existingLockAndCount) -> {
if (existingLockAndCount.count.decrementAndGet() == 0) {
// No more references to the lock, can remove
return null;
}
return existingLockAndCount;
});
}
private final ConcurrentHashMap<String, LockAndCount> LOCK_MAPS = new ConcurrentHashMap<>();
class LockAndCount {
final Lock lock;
final AtomicInteger count;
LockAndCount(Lock lock, AtomicInteger count) {
this.lock = lock;
this.count = count;
}
}
创建了一个新的内部类LockAndCount
,它包含一个ReentrantLock
和一个AtomicInteger
。AtomicInteger
用于跟踪每个锁的引用计数。当第一次创建一个锁时,引用计数设置为0,并在每次lock
方法被调用时递增。相应地,在每次unlock
方法被调用时递减。
在unLock
方法中 ,使用了ConcurrentHashMap
的computeIfPresent
方法来原子地更新映射。如果引用计数降至0,就从映射中移除锁对象。这保证了只有当没有线程持有锁时,锁对象才会被移除。
请注意,为了避免潜在的死锁,lock
和unlock
方法都应该在try-finally
块中使用,以确保锁总是被释放。在上述代码中简化了异常处理,实际应用中应该根据需要添加适当的异常处理逻辑。
结论
通过本文的分析和代码实现,展示了在Java中如何安全地管理锁资源,并通过引用计数等技术优化内存使用。强调了在多线程编程中理解并发控制的重要性,并提供了实用的解决方案和最佳实践。
代码很坑,且行且珍惜。
本文皆为个人原创,请尊重创作,未经许可不得转载。
转载自:https://juejin.cn/post/7415662205615620147