likes
comments
collection
share

【探究】Netty高性能FastThreadLocal凭什么敢叫Fast???

作者站长头像
站长
· 阅读数 4

为什么Netty自己造了一个FastThreadLocal?

FastThreadLocal之前,我们通常使用JDK自带的ThreadLocal来解决多线程环境下变量并发访问的冲突问题,ThreadLocal通过为每个线程提供独立的变量副本每个线程修改自己的变量副本,解决了并发访问,且线程安全

但是,每个线程会拥有自己的ThreadLocalMap,该ThreadLocalMap维护了该线程使用到的所有ThreadLocal设置数据时map使用的是hash + 线性探测来解决冲突,所以,当线程内部拥有很多ThreadLocal时,这个解决冲突的方式效率就低了起来


FastThreadLocal如何Fast?

那么FastThreadLocal凭什么在ThreadLocal前面加上Fast呢?

下面是两者三件套的对比

ThreadLocalFastThreadLocal
ThreadLocalFastThreadLocal
ThreadLocalMapInternalThreadLocalMap
ThreadFastThreadLocalThread

ThreadLocal主要就是慢在ThreadLocalMap 采用线性探测法解决 Hash冲突,性能较低。

而FastThreadLocal整体上就是采取的一个空间换时间的方式,通过将Map转为数组,每个FastThreadLocal在初始化时,会分配一个自增的index从而避免hash重提,并且在读写数据的时候通过数组下标 index 直接定位该到 FastThreadLocal 的位置,时间复杂度为 O(1)


FastThreadLocal源码

FastThreadLocal初始化

public class FastThreadLocal<V> {
  private final int index;

  public FastThreadLocal() {
      index = InternalThreadLocalMap.nextVariableIndex();
  }
}


public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
  
  private static final AtomicInteger nextIndex = new AtomicInteger();
  
  public static int nextVariableIndex() {
    int index = nextIndex.getAndIncrement();
    if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) {
      nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE);
      throw new IllegalStateException("too many thread-local indexed variables");
    }
    return index;
  }
  
}

FastThreadLocal在初始化的时候,会调用InternalThreadLocalMap的nextVariableIndex方法来获取到一个index,

而nextVariableIndex()使用过原子类AtomicInteger来进行分配,保证线程安全且index递增。


FastThreadLocal#get()方法

public class FastThreadLocal<V> {

  @SuppressWarnings("unchecked")
  public final V get() {
    // 拿到当前线程的InternalThreadLocalMap
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    
    // 根据index直接定位元素
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
      return (V) v;
    }

    // 第一次,需要初始化
    return initialize(threadLocalMap);
  }
  
}

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
  
  private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
            new ThreadLocal<InternalThreadLocalMap>();
  
  public Object indexedVariable(int index) {
    Object[] lookup = indexedVariables;
    return index < lookup.length? lookup[index] : UNSET;
  }
  
  public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
      // 是FastThreadLocalThread
      return fastGet((FastThreadLocalThread) thread);
    } else {
      // 不是FastThreadLocalThread
      return slowGet();
    }
  }
  
  // 是FastThreadLocalThread
  private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    // 如果为null,则进行初始化
    if (threadLocalMap == null) {
      thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
  }

  // 不是FastThreadLocalThread
  private static InternalThreadLocalMap slowGet() {
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    // 如果为null,则进行初始化
    if (ret == null) {
      ret = new InternalThreadLocalMap();
      slowThreadLocalMap.set(ret);
    }
    return ret;
  }
  
}

FastThreadLocal的get方法,首先会调用InternalThreadLocalMapget方法,拿到当前线程的InternalThreadLocalMap

InternalThreadLocalMapget()方法中,兼容了当前线程不是FastThreadLocalThread的情况,如果是普通Thread,则会从ThreadLocal中获取

拿到InternalThreadLocalMap后,调用indexedVariable(index) 方法,通过当前FastThreadLocal所分配的index,直接从数组中定位到元素返回。

如果元素为UNSET,则说明是第一次,需要调用initialize()进行初始化。


FastThreadLocal#initialize()

如果是第一次调用get方法,即值是UNSET时,需要进行初始化

初始化逻辑默认返回null,可自定义实现逻辑

初始化完毕后,调用InternalThreadLocalMap#setIndexedVariable,把值设置回去

// 初始化
private V initialize(InternalThreadLocalMap threadLocalMap) {
  V v = null;
  try {
    v = initialValue();
  } catch (Exception e) {
    PlatformDependent.throwException(e);
  }

  threadLocalMap.setIndexedVariable(index, v);
  // 把当前 FastThreadLocal 对象添加到待清理集合中
  addToVariablesToRemove(threadLocalMap, this);
  return v;
}

/**
 	* Returns the initial value for this thread-local variable.
 	*/
protected V initialValue() throws Exception {
  // 默认返回null,可自定义实现
  return null;
}

通过index定位数组元素,并重新赋值;如果数组容量不够,则会进行扩容操作后,再赋值。

public boolean setIndexedVariable(int index, Object value) {
  Object[] lookup = indexedVariables;
  if (index < lookup.length) {
    Object oldValue = lookup[index];
    lookup[index] = value;
    return oldValue == UNSET;
  } else {
    expandIndexedVariableTableAndSet(index, value);
    return true;
  }
}

private void expandIndexedVariableTableAndSet(int index, Object value) {
  Object[] oldArray = indexedVariables;
  final int oldCapacity = oldArray.length;
  int newCapacity;
  if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) {
    newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;
  } else {
    newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE;
  }

  Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
  Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
  newArray[index] = value;
  indexedVariables = newArray;
}

FastThreadLocal#addToVariablesToRemove

public class FastThreadLocal<V> {
  
  private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

  private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
      // 初始化并set
      variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
      threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
      // 强转为Set<FastThreadLocal<?>>
      variablesToRemove = (Set<FastThreadLocal<?>>) v;
    }
		
    // 将FastThreadLocal添加到待清除集合中
    variablesToRemove.add(variable);
  }
}

FastThreadLocal维护了一个静态的index,固定下标为0;

所以所有FastThreadLocalThreadInternalThreadLocalMap,下标为0的空间存放的都是待清除的FastThreadLocal,即FastThreadLocal集合。


FastThreadLocal#remove()、removeAll()

public final void remove() {
  remove(InternalThreadLocalMap.getIfSet());
}

public static InternalThreadLocalMap getIfSet() {
  Thread thread = Thread.currentThread();
  if (thread instanceof FastThreadLocalThread) {
    return ((FastThreadLocalThread) thread).threadLocalMap();
  }
  return slowThreadLocalMap.get();
}


public final void remove(InternalThreadLocalMap threadLocalMap) {
  if (threadLocalMap == null) {
    return;
  }

  Object v = threadLocalMap.removeIndexedVariable(index);
  removeFromVariablesToRemove(threadLocalMap, this);

  if (v != InternalThreadLocalMap.UNSET) {
    try {
      // 删除扩展操作
      onRemoval((V) v);
    } catch (Exception e) {
      PlatformDependent.throwException(e);
    }
  }
}

protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }

public Object removeIndexedVariable(int index) {
  Object[] lookup = indexedVariables;
  if (index < lookup.length) {
    Object v = lookup[index];
    lookup[index] = UNSET;
    return v;
  } else {
    return UNSET;
  }
}

private static void removeFromVariablesToRemove(
  InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {

  Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);

  if (v == InternalThreadLocalMap.UNSET || v == null) {
    return;
  }

  @SuppressWarnings("unchecked")
  Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
  variablesToRemove.remove(variable);
}

remove操作,流程跟get差不多,先获取当前线程的InternalThreadLocalMap,根据FastThreadLocalindex定位,并将值设置为UNSET达到删除操作。

并将当前FastThreadLocal从InternalThreadLocalMap的待删除Set集合中删除掉。

public static void removeAll() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap == null) {
        return;
    }

    try {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        if (v != null && v != InternalThreadLocalMap.UNSET) {
            @SuppressWarnings("unchecked")
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            FastThreadLocal<?>[] variablesToRemoveArray =
                    variablesToRemove.toArray(new FastThreadLocal[0]);
            for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                tlv.remove(threadLocalMap);
            }
        }
    } finally {
        InternalThreadLocalMap.remove();
    }
}

removeAll()操作也是类似,只不过是把当前线程的全部FastThreadLocal给删掉,直接通过获取index为0的待删除Set集合,遍历删除即可,并且在最后如果该线程是FastThreadLocalThread,则把当前线程的threadLocalMap设置为null;如果是普通Thread,则是调用ThreadLocalremove方法进行删除。


FastThreadLocal#set()

public final void set(V value) {
  if (value != InternalThreadLocalMap.UNSET) {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    setKnownNotUnset(threadLocalMap, value);
  } else {
    // 如果是要设置为UNSET,本质跟删除操作一样,所以走remove方法
    remove();
  }
}

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
  if (threadLocalMap.setIndexedVariable(index, value)) {
    addToVariablesToRemove(threadLocalMap, this);
  }
}

public boolean setIndexedVariable(int index, Object value) {
  Object[] lookup = indexedVariables;
  if (index < lookup.length) {
    Object oldValue = lookup[index];
    lookup[index] = value;
    // 第一次set,会返回true
    return oldValue == UNSET;
  } else {
    expandIndexedVariableTableAndSet(index, value);
    return true;
  }
}

通过对上面源码的了解,再来看set()方法也非常的容易,原理还是通过FastThreadLocalindex进行定位再设置value

如果是第一次set,则会把当前FastThreadLocal添加到待删除集合中去。


总结

每个FastThreadLocalThread拥有自己的一个InternalThreadLocalMapInternalThreadLocalMap内部通过数组维护该线程的所有FastThreadLocal,每个FastThreadLocal拥有独立的index结合数组,可直接根据index定位,效率非常高。

InternalThreadLocalMap内部数组下标为0的空间,维护当前线程所有FastThreadLocal集合,

当然,缺点也很明显,数组空间无法复用,且需要结合FastThreadLocalThread使用,当一个线程的FastThreadLocalremove了之后,它原本所占据的数组中的空间无法再次被使用。

整体上来看,FastThreadLocalThreadLocal各有利弊,具体使用看业务场景。