【探究】Netty高性能FastThreadLocal凭什么敢叫Fast???
为什么Netty自己造了一个FastThreadLocal?
在FastThreadLocal
之前,我们通常使用JDK自带的ThreadLocal
来解决多线程环境下变量并发访问的冲突问题,ThreadLocal
通过为每个线程提供独立的变量副本,每个线程修改自己的变量副本,解决了并发访问,且线程安全。
但是,每个线程会拥有自己的ThreadLocalMap
,该ThreadLocalMap
维护了该线程使用到的所有ThreadLocal
,设置数据时该map
使用的是hash + 线性探测来解决冲突,所以,当线程内部拥有很多ThreadLocal
时,这个解决冲突的方式效率就低了起来。
FastThreadLocal如何Fast?
那么FastThreadLocal
凭什么在ThreadLocal
前面加上Fast
呢?
下面是两者三件套的对比
ThreadLocal | FastThreadLocal |
---|---|
ThreadLocal | FastThreadLocal |
ThreadLocalMap | InternalThreadLocalMap |
Thread | FastThreadLocalThread |
ThreadLocal
主要就是慢在ThreadLocalMap
采用线性探测法
解决 Hash冲突,性能较低。
而FastThreadLocal整体上就是采取的一个空间换时间
的方式,通过将Map转为数组
,每个FastThreadLocal在初始化时,会分配一个自增的index
从而避免hash
重提,并且在读写数据的时候通过数组下标 index
直接定位该到 FastThreadLocal 的位置,时间复杂度为 O(1)
。
FastThreadLocal源码
FastThreadLocal初始化
public class FastThreadLocal<V> {
private final int index;
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
private static final AtomicInteger nextIndex = new AtomicInteger();
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) {
nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE);
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
}
FastThreadLocal在初始化的时候,会调用InternalThreadLocalMap的nextVariableIndex方法来获取到一个index,
而nextVariableIndex()使用过原子类AtomicInteger来进行分配,保证线程安全且index递增。
FastThreadLocal#get()方法
public class FastThreadLocal<V> {
@SuppressWarnings("unchecked")
public final V get() {
// 拿到当前线程的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 根据index直接定位元素
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 第一次,需要初始化
return initialize(threadLocalMap);
}
}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
new ThreadLocal<InternalThreadLocalMap>();
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
// 是FastThreadLocalThread
return fastGet((FastThreadLocalThread) thread);
} else {
// 不是FastThreadLocalThread
return slowGet();
}
}
// 是FastThreadLocalThread
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
// 如果为null,则进行初始化
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
// 不是FastThreadLocalThread
private static InternalThreadLocalMap slowGet() {
InternalThreadLocalMap ret = slowThreadLocalMap.get();
// 如果为null,则进行初始化
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
}
FastThreadLocal的get方法,首先会调用InternalThreadLocalMap
的get
方法,拿到当前线程的InternalThreadLocalMap
在InternalThreadLocalMap
的get()
方法中,兼容了当前线程不是FastThreadLocalThread
的情况,如果是普通Thread
,则会从ThreadLocal
中获取
拿到InternalThreadLocalMap
后,调用indexedVariable(index)
方法,通过当前FastThreadLocal
所分配的index
,直接从数组中定位到元素返回。
如果元素为UNSET
,则说明是第一次,需要调用initialize()
进行初始化。
FastThreadLocal#initialize()
如果是第一次调用get
方法,即值是UNSET
时,需要进行初始化
初始化逻辑默认返回null
,可自定义实现逻辑
初始化完毕后,调用InternalThreadLocalMap#setIndexedVariable
,把值设置回去
// 初始化
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
threadLocalMap.setIndexedVariable(index, v);
// 把当前 FastThreadLocal 对象添加到待清理集合中
addToVariablesToRemove(threadLocalMap, this);
return v;
}
/**
* Returns the initial value for this thread-local variable.
*/
protected V initialValue() throws Exception {
// 默认返回null,可自定义实现
return null;
}
通过index
定位数组元素,并重新赋值;如果数组容量不够,则会进行扩容操作后,再赋值。
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity;
if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) {
newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
} else {
newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE;
}
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
FastThreadLocal#addToVariablesToRemove
public class FastThreadLocal<V> {
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
// 初始化并set
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
// 强转为Set<FastThreadLocal<?>>
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
// 将FastThreadLocal添加到待清除集合中
variablesToRemove.add(variable);
}
}
FastThreadLocal维护了一个静态的index,固定下标为0;
所以所有FastThreadLocalThread
的InternalThreadLocalMap
,下标为0
的空间存放的都是待清除的FastThreadLocal
,即FastThreadLocal集合。
FastThreadLocal#remove()、removeAll()
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}
public static InternalThreadLocalMap getIfSet() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return ((FastThreadLocalThread) thread).threadLocalMap();
}
return slowThreadLocalMap.get();
}
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
// 删除扩展操作
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
remove
操作,流程跟get
差不多,先获取当前线程的InternalThreadLocalMap
,根据FastThreadLocal
的index
定位,并将值设置为UNSET
达到删除操作。
并将当前FastThreadLocal从InternalThreadLocalMap的待删除Set集合中删除掉。
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
removeAll()
操作也是类似,只不过是把当前线程的全部FastThreadLocal
给删掉,直接通过获取index
为0的待删除Set集合,遍历删除即可,并且在最后如果该线程是FastThreadLocalThread
,则把当前线程的threadLocalMap
设置为null;如果是普通Thread,则是调用ThreadLocal
的remove
方法进行删除。
FastThreadLocal#set()
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
// 如果是要设置为UNSET,本质跟删除操作一样,所以走remove方法
remove();
}
}
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
}
}
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
// 第一次set,会返回true
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
通过对上面源码的了解,再来看set()
方法也非常的容易,原理还是通过FastThreadLocal
的index
进行定位再设置value
如果是第一次set
,则会把当前FastThreadLocal
添加到待删除集合中去。
总结
每个FastThreadLocalThread
拥有自己的一个InternalThreadLocalMap
,InternalThreadLocalMap
内部通过数组维护该线程的所有FastThreadLocal
,每个FastThreadLocal
拥有独立的index
,结合数组,可直接根据index
定位,效率非常高。
InternalThreadLocalMap
内部数组下标为0的空间,维护当前线程所有FastThreadLocal集合,
当然,缺点也很明显,数组空间无法复用,且需要结合FastThreadLocalThread
使用,当一个线程的FastThreadLocal
被remove
了之后,它原本所占据的数组中的空间无法再次被使用。
整体上来看,FastThreadLocal
跟ThreadLocal
各有利弊,具体使用看业务场景。
转载自:https://juejin.cn/post/7306808476373286950