啃透Java并发-LockSupport源码详解
Java1.5加入的JUC并发包,就像一把好用的瑞士军刀,极大的丰富了Java处理并发的手段,但JUC并不简单,有一定的学习成本,我曾经也断断续续看过一些JUC的实现源码,但是既不系统也不够深入,这次决定重新出发,重新拜读大师Doug Lea的神作,所以自己也是抱着以学代练的心态,记录自己的学习心得,难免有理解不到位的地方,大家轻喷哈。
为什么要阅读源码
不知道你有没有这样的感觉,在使用JUC中提供的工具类处理并发时,有一种死记硬背的感觉,比如LockSupport应该怎么用,CountDownLatch能干嘛,但并不清楚其实现原理,只知道how不知道why,这种状态有二个比较大的问题。
- 死记硬背,比较容易遗忘,在工作中使用容易挖坑,风险大
- 对JUC的认识不够深入,知识不能够形成体系,难以融会贯通,灵活运用
那要深入,最直接有效的办法就是阅读源码!
为什么要先解析LockSupport
我们知道JUC看似有很多类,结构错综复杂,但是如果要从中挑出最重要的一个类,那一定是队列同步器AbstractQueuedSynchronizer, 而AbstractQueuedSynchronizer又是利用LockSupport来控制线程的状态,从而达到线程在等待唤醒之间切换的目的。而我们处理并发,重点就是管理线程的状态,所以理解LockSupport是很重要的一个基础。
LockSupoort的简单使用
先来看一个简单的例子
public static void main(String[] args) {
Thread worker = new Thread(() -> {
LockSupport.park();
System.out.println("start work");
});
worker.start();
System.out.println("main thread sleep");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(worker);
try {
worker.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 最终控制台输出结果
main thread sleep
start work
启动一个worker线程,主线程先sleep 500ms,worker线程因为调用了LockSupport的park,会等待,直到主线程sleep结束,调用unpark唤醒worker线程。那么在JUC之前,我们常用的让线程等待的方法如下
Object monitor = new Object();
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
主要有三点区别
- LockSupport.park和unpark不需要在同步代码块中,wait和notify是需要的。
- LockSupport的pork和unpark是针对线程的,而wait和notify是可以是任意对象。
- LockSupport的unpark可以让指定线程被唤醒,但是notify是随机唤醒一个,notifyAll是全部唤醒,不够灵活。
LockSupport源码解读
前面只是铺垫,现在来到我们的主菜,解读LockSupport的park和unpark方法,当然还有一些其他类似的重载方法,如parkUntil,parkNanos,它们的大体原理类似,感兴趣大家可以自行查阅源码。
这篇文章以及后续的文章,分析的源码都基于Open Jdk 8。
LockSupport.unpark
为什么先讲unpark方法,因为unpark代码量少一些,相对简单,柿子先捡软的捏-。-
//java.util.concurrent.locks.LockSupport.java
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
//
UNSAFE = sun.misc.Unsafe.getUnsafe();
参数thread是我们要唤醒的目标线程,先判空,然后调用UNSAFE.unpark,UNSAFE是Unsafe对象,不要被这个名字吓到,这个类提供了很多有用的方法,之前的文章也有提到过,比如获取类对象中属性的内存偏移地址,还有 CAS操作等。但是这个Unsafe对象必须使用反射得到然后才能正常使用,因为getUnsafe方法有判断当前类加载器是不是BootStrapClassLoader。我们继续查看Unsafe类unpark的实现。
// Unsafe.java
public native void unpark(Object thread);
可以看到unpark是一个native方法,它的native实现是在 hotspot\src\share\vm\prims\unsafe.cpp 看下代码实现,
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
UnsafeWrapper("Unsafe_Unpark");
// 声明一个Parker对象p,它是真正干活的对象
Parker* p = NULL;
if (jthread != NULL) {
// 根据传入的jthread对象,来获取native层的oopDesc*对象,oop是oopDesc* 的宏定义
oop java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
// 获取java_thread对象中_park_event_offset的值,该值就是Parker对象的地址
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// 如果地址有效,直接转为Parker指针
p = (Parker*)addr_from_java(lp);
} else {
// 如果地址无效
MutexLocker mu(Threads_lock);
java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
// 转为native层的JavaThread对象
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
// 将JavaThread的成员变量_parker赋值给p
p = thr->parker();
if (p != NULL) { // Bind to Java thread for next time.
// 将p的地址赋值给_park_event_offset,下次获取时可用
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
}
}
if (p != NULL) {
// 这个USDT2的宏,暂时我也不清楚是干啥的,不过不影响我们的分析,我们先忽略
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
HOTSPOT_THREAD_UNPARK(
(uintptr_t) p);
#endif /* USDT2 */
// 真正干货的方法,调用了Parker的unpark方法
p->unpark();
}
根据上面的代码,我们需要知道二个native层的类,JavaThread类和Parker类
class JavaThread: public Thread {
private:
JavaThread* _next; // The next thread in the Threads list
oop _threadObj; // The Java level thread object
// 省略代码...
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
// 省略代码...
JavaThread类很长,这里只列出几个成员变量,现在只需要知道它是native层的Thread,成员变量_threadObj是Java层的thread对象,通过它native层可以调用Java层的代码。我们继续重点看下Parker类的实现。
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
// 省略代码...
}
我们重点关注_counter字段,可以简单理解为_counter字段 > 0时,可以通行,即park方法会直接返回,另外park方法返回后,_counter会被赋值为0,unpark方法可以将_counter置为1,并且唤醒当前等待的线程。
可以看到Parker的父类是os::PlatformParker,那这个类又是干嘛的呢?这里先插个题外话, 我们都知道,Java是跨平台的,我们在应用层定义的Thread肯定依赖于具体的平台,不同的平台有不同实现,比如Linux是一套代码,Windows是另外一套,那我们就能理解了,PlatformParker根据平台有不同的实现。在OpenJdk8的实现中支持5个平台
- aix
- bsd
- linux
- solaris
- windows
我们知道Linux是现在使用比较广泛的操作系统,比如熟知的Android是基于Linux内核,所以这里我们就挑选Linux来分析吧。对应的文件路径hotspot\src\os\linux\vm\os_linux.cpp
void Parker::unpark() {
int s, status ;
// 先进入_mutex的临界区,声明如下
// pthread_mutex_t _mutex [1] ;
// pthread_cond_t _cond [2] ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
// 将_counter置为1
_counter = 1;
// s记录的是unpark之前的_counter数,如果s < 1,说明有可能该线程在等待状态,需要唤醒。
if (s < 1) {
// thread might be parked
// _cur_index代表被使用cond的index
if (_cur_index != -1) {
// thread is definitely parked
// 根据虚拟机参数WorkAroundNPTLTimedWaitHang来做不同的处理,默认该参数是1
if (WorkAroundNPTLTimedWaitHang) {
// 先唤醒目标等待的线程
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
// 释放互斥锁,先唤醒后释放锁,可能会导致线程被唤醒后获取不到锁,再次进入等待状态,我的理解是效率可能会低一丢丢
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
// 先释放锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
// 后发信号唤醒线程,唤醒操作在互斥代码块外部,感觉这里可能会有风险,暂时还GET不到。。。
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
}
} else {
// 如果线程没有在等待,直接返回
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else {
// 如果线程没有在等待,直接返回
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
代码不多,都加了注释,总体来说就是根据Park类的成员变量_counter来做加锁解锁和唤醒操作,在Linux平台, 加锁用的pthread_mutex_lock,解锁是pthread_mutex_unlock,唤醒是pthread_cond_signal 。接下来解析LockSupport的park方法。
LockSupport.park
先看下Java层park方法的实现
public static void park() {
UNSAFE.park(false, 0L);
}
Unsafe中的实现
public native void park(boolean var1, long var2);
仍然是一个native方法,我们继续跟进去看下
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
// 省略代码...
thread->parker()->park(isAbsolute != 0, time);
// 省略代码...
省略了非关键代码,重点是park方法,这个thread我们前面已经遇到过,就是native层的JavaThread对象,然后调用Parker的park方法,继续跟进去linux平台的os_linux.cpp的实现
void Parker::park(bool isAbsolute, jlong time) {
// 先原子的将_counter的值设为0,并返回_counter的原值,如果原值>0说明有通行证,直接返回
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// 判断线程是否已经被中断
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
// park方法的传参是isAbsolute = false, time = 0,所以会继续往下走
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
// 这里time为0,如果调用的是parkNanos或者parkUtil,这里time就会>0,
if (time > 0) {
// 如果time > 0,unpackTime计算absTime的时间
unpackTime(&absTime, isAbsolute, time);
}
ThreadBlockInVM tbivm(jt);
// 再次判断线程是否被中断,如果没有被中断,尝试获得互斥锁,如果获取失败,直接返回
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
// 如果_counter > 0, 不需要等待,这里再次检查_counter的值
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// 插入一个写内存屏障,保证可见性,具体实现见下方
OrderAccess::fence();
return;
}
// 省略assert代码
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
// 设置JavaThread的_suspend_equivalent为true,表示线程被暂停
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
// 让线程等待_cond[_cur_index]信号,到这里线程进入等待状态
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
// 线程进入有超时时间的等待,内部实现调用了pthread_cond_timedwait系统调用
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
_cur_index = -1;
// 省略assert代码
// _counter重新设置为0
_counter = 0 ;
// 释放互斥锁
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// 插入写屏障
OrderAccess::fence();
// 省略额外检查代码
}
// OrderAccess::fence 在linux平台的实现
inline void OrderAccess::fence() {
// 如果是多核cpu
if (os::is_MP()) {
// always use locked addl since mfence is sometimes expensive
// 判断是否AMD64 CPU,汇编代码实现写屏障
#ifdef AMD64
__asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else
__asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif
}
}
上面分析了park的实现原理,有了前面unpark方法分析的知识铺垫,park方法应该很容易看懂。
收获和总结
通过LockSupport的源码阅读,可以总结出一下几点
- native层的JavaThread通过Parker的_counter来表示通行证,>0表示可以通行,如果_counter=0,调用park线程会等待,直到被unpark唤醒,如果先调用unpack,再调用park会直接返回,并消费掉_counter(设置为0)。
- Linux平台,线程等待和唤醒,加锁用的pthread_mutex_lock,解锁是pthread_mutex_unlock,唤醒是pthread_cond_signal,了解到这些心里就有数了,知其然知其所以然,何其快哉!
最后,还是想提一下Java层关于线程状态的小知识,可能有些同学会不是特别清楚,所以还是做个总结。 Java线程状态有以下6种。
- NEW (新建的线程,还没有调用start)
- RUNNABLE (调用了start,正在运行,或者在等待操作系统的调度,分配CPU时间片)
- BLOCKED (synchronied,等待monitorEnter)
- WAITING (wait,LockSupport.park 会进入该状态)
- TIMED_WAITING (带等待时间的wait, LockSupport.parkNanos, parkUtil)
- TERMINATED (线程执行结束)
关于并发,我们软件工程师要做的,就是控制线程在这几个状态间正确转换,所谓“工欲善其事,必先利其器”,JDK提供的各种并发工具类,我们只有深入了解它们,才能灵活高效的运用,这也是我记录"啃透Java并发"系列文章的初心,与君共勉!
转载自:https://juejin.cn/post/6844903938202796039