HikariPool源码(五)工作线程以及相关工具类
1.HikariPool中的工作线程调度
HikariPool中的工作线程都通过ThreadPoolExecutor来调度,一共有3个ThreadPoolExecutor实例,
ThreadPoolExecutor | 职责 | 超出负载后的处理策略 |
---|---|---|
houseKeepingExecutorService | 负责1.数据库连接池动态伸缩中减少数据库连接 2.监控数据库连接泄漏3.监控超过最大生命期的数据库连接 | 抛弃 |
addConnectionExecutor | 负责创建数据库连接,包括在数据库连接池动态伸缩时新增数据库连接。 | 抛弃 |
closeConnectionExecutor | 负责关闭数据库连接。 | 重复执行,直到成功 |
1.1. houseKeepingExecutorService
实例化:
//HikariPool.java
private ScheduledExecutorService initializeHouseKeepingExecutorService()
{
if (config.getScheduledExecutor() == null) {
final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElseGet(() -> new DefaultThreadFactory(poolName + " housekeeper", true));
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
else {
return config.getScheduledExecutor();
}
}
1.1.1 监控连接泄漏
//HikariPool.java
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
//ProxyLeakTaskFactory.java
private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {
ProxyLeakTask task = new ProxyLeakTask(poolEntry);
// executorService就是houseKeepingExecutorService
task.schedule(executorService, leakDetectionThreshold);
return task;
}
//ProxyLeakTask.java
ProxyLeakTask(final PoolEntry poolEntry)
{
this.exception = new Exception("Apparent connection leak detected");
this.threadName = Thread.currentThread().getName();
this.connectionName = poolEntry.connection.toString();
}
public void run()
{
isLeaked = true;
final StackTraceElement[] stackTrace = exception.getStackTrace();
final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
System.arraycopy(stackTrace, 5, trace, 0, trace.length);
exception.setStackTrace(trace);
// 下面是监控到连接泄漏的处理,这里只是记录到日志中,如果通过一个接口处理,并可以让使用者动态实现会更灵活
LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
}
1.1.2 连接池动态伸缩
//HikariPool.java
// HouseKeeper是负载连接池动态伸缩的工作线程
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
1.1.3 监控数据库连接最大生命期
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
() -> {
if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
addBagItem(connectionBag.getWaitingThreadCount());
}
},
lifetime, MILLISECONDS));
}
1.2. addConnectionExecutor
实例化:
//HikariPool.java
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
//UtilityElf.java
public static ThreadPoolExecutor createThreadPoolExecutor(final BlockingQueue<Runnable> queue, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy)
{
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(1 /*core*/, 1 /*max*/, 5 /*keepalive*/, SECONDS, queue, threadFactory, policy);
executor.allowCoreThreadTimeOut(true);
return executor;
}
添加连接:
//HikariPool.java
public void addBagItem(final int waiting)
{
final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.
if (shouldAdd) {
addConnectionExecutor.submit(poolEntryCreator);
}
}
// 连接词动态伸缩增加连接
private synchronized void fillPool()
{
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueue.size();
for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
}
}
1.3. closeConnectionExecutor
实例化:
//HikariPool.java
this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
//UtilityElf.java
public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy)
{
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueSize);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1 /*core*/, 1 /*max*/, 5 /*keepalive*/, SECONDS, queue, threadFactory, policy);
executor.allowCoreThreadTimeOut(true);
return executor;
}
关闭连接:
void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
if (connectionBag.remove(poolEntry)) {
final Connection connection = poolEntry.close();
closeConnectionExecutor.execute(() -> {
quietlyCloseConnection(connection, closureReason);
if (poolState == POOL_NORMAL) {
fillPool();
}
});
}
}
2.相关工具类
类 | 职责 |
---|---|
ThreadPoolExecutor | 线程执行器 |
BlockingQueue | 线程池所使用的缓冲队列,队列长度决定了能够缓冲的最大工作线程数量 |
ThreadFactory | 创建工作线程的线程工厂 |
ScheduledThreadPoolExecutor | 支持计划调度的线程池执行器,可以指定延迟执行和周期性执行。这样就可以设置延迟时间为最大生命期时间来监控数据库连接是否超过最大生命期 |
DefaultThreadFactory | HikariPool中实现的默认线程工厂,设置了线程名和将线程设置为精灵线程 |
RejectedExecutionHandler | 线程执行器中线程队列满时增加新线程的处理策略接口 |
DiscardOldestPolicy | 抛弃线程队列中最老的未执行工作线程,添加新的工作线程,在HikariPool中没有使用。 |
CallerRunsPolicy | 重复执行,直到成功,在closeConnectionExecutor中使用。 |
AbortPolicy | 抛弃超出线程队列负载的工作线程,并抛出异常。在HikariPool中没有使用。 |
DiscardPolicy | 忽略超出线程队列复杂的工作线程,不做任何处理。在houseKeepingExecutorService和houseKeepingExecutorService中使用。 |
3.核心类
3.1 ThreadPoolExecutor
构造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
参数说明:
参数 | 说明 |
---|---|
int corePoolSize | 线程池执行器中要保持的最小线程数 |
int maximumPoolSize | 线程池执行器允许的最大线程数 |
long keepAliveTime | 未使用线程的保留时间,如果超过这个时间且线程数量大于最小线程数,则该线程会被释放 |
TimeUnit | 未使用线程的保留时间单位 |
BlockingQueue | 线程缓冲队列,这个队列的作用会受maximumPoolSize的影响,当线程数足够时不会放入队列。 |
ThreadFactory | 线程工厂接口,用于生成工作线程 |
RejectedExecutionHandler | 工作线程数量放入缓存队列超过缓存队列容量后的处理策略接口 |
这里需要注意的有几点:
- 线程缓冲队列应设置为有界队列,避免无限增大导致内存溢出。
- 最大线程数也应适当控制,避免设置为Integer.MAX_VALUE,原因同上。
- 线程缓冲队列的处理逻辑受corePoolSize和maximumPoolSize的影响,简单说就是当可用线程足够时,工作线程不会放入线程缓冲队列。
例子:
import java.util.concurrent.*;
import static java.util.concurrent.ThreadPoolExecutor.*;
public class ThreadPoolExecutorTest {
private static int runableNum = 1;
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue(3);
// 修改maximumPoolSize和maximumPoolSize的大小可以看到对queue处理逻辑的影响
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 300, TimeUnit.SECONDS,
queue, new DefaultThreadFactory(), new DefaultDiscardPolicy());
while(true) {
System.out.println("runableNum: " + runableNum);
executor.execute(new DefaultRunnable("id-" + runableNum));
runableNum++;
quietlySleep(500);
}
}
private static void quietlySleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
private static class DefaultRunnable implements Runnable {
private String name;
public DefaultRunnable(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("Runnable-" + name + " run.");
quietlySleep(3000);
}
public String getName() {
return this.name;
}
}
private static class DefaultDiscardPolicy extends DiscardPolicy {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
super.rejectedExecution(r, e);
if (r instanceof DefaultRunnable) {
DefaultRunnable defaultRunnable = (DefaultRunnable)r;
System.out.println("Runnable-" + defaultRunnable.getName() + " be discard.");
}
}
}
private static class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
return thread;
}
}
}
输出:
runableNum: 1
Runnable-id-1 run.
runableNum: 2
runableNum: 3
runableNum: 4
runableNum: 5
Runnable-id-5 run.
runableNum: 6
Runnable-id-6 run.
Runnable-id-2 run.
runableNum: 7
runableNum: 8
Runnable-id-8 be discard.
runableNum: 9
Runnable-id-9 be discard.
runableNum: 10
Runnable-id-10 be discard.
Runnable-id-3 run.
runableNum: 11
Runnable-id-4 run.
runableNum: 12
Runnable-id-7 run.
runableNum: 13
runableNum: 14
Runnable-id-14 be discard.
3.2 ScheduledThreadPoolExecutor
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
可以借用它的延迟执行线程的能力,来监控连接泄漏或者超过最大生命期。
4. 总结
- ThreadPoolExecutor和ScheduledThreadPoolExecutor相关类是提高线程执行性能的核心工具类,要善于利用。
- 充分利用线程工具管理资源池,合理安排工作线程。
end.
<--感谢三连击,左边点赞和关注。
Java极客站点: javageektour.com/
转载自:https://juejin.cn/post/6844904120940232712