连接池——HTTP连接池
HTTP 长连接 (Keep Alive)
在 HTTP 1.1
中,引入了 HTTP persistent connection
的概念,也称为 HTTP keep-alive
,通过将 HTTP
请求头设置如下内容,可以实现连接在一定时间内的重复使用。
Connection: Keep-Alive
HTTP
连接池是也是基于此去实现连接的复用
HTTP连接池
在传统的HTTP
请求处理方法中,每个请求都需要创建一个新的HTTP
连接,这将导致连接的创建和销毁开销较高,并且在大规模并发请求的情况下可能导致性能瓶颈。
在应用程序和HTTP
服务器之间建立一个可重复使用的连接集合,以减少连接的创建和销毁开销,并提高应用程序的性能和响应速度。
HttpClient
连接池
Apache HttpClient
连接池是用于管理和优化HTTP
连接的重要工具之一。我们接下来以HttpClient
连接池为例看下它的实现原理。
HttpClient
连接池的使用
Apache HttpClient
连接池是通过PoolingHttpClientConnectionManager
管理 HTTP
连接。
在使用连接池时,通过使用 setMaxTotal()
和 setDefaultMaxPerRoute()
方法来配置连接池的最大连接数和每个路由的最大连接数。
public static final CloseableHttpClient httpClient;
static {
RequestConfig config = RequestConfig.custom().setConnectTimeout(5000).setConnectionRequestTimeout(5000).setSocketTimeout(5000).build();
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
// 创建连接池管理器
// 设置最大连接数和最大线程数
cm.setMaxTotal(100);
cm.setDefaultMaxPerRoute(20);
httpClient = HttpClients.custom()
// 设置连接池管理
.setConnectionManager(cm)
//设置超时时间
.setDefaultRequestConfig(config)
// 设置重试次数
.setRetryHandler(new DefaultHttpRequestRetryHandler(2, false))
.build();
}
HttpClient
连接池的管理
我们以HttpClient
最新4.5.14 版本为例,看下是怎么实现的连接池管理。
调用链路
我们先来看下普通的HTTP
调用的使用,它的入口是CloseableHttpClient
类中的 execute()
方法,先看下整体的调用链路信息
public static void main(String[] args) throws IOException {
HttpGet get = new HttpGet("http://www.baidu.com");
HttpResponse response = HttpUtil.httpClient.execute(get);
if (response.getStatusLine().getStatusCode() == 200) {
HttpEntity resEntity = response.getEntity();
String message = EntityUtils.toString(resEntity, "UTF-8");
System.out.println(message);
} else {
System.out.println("请求失败");
}
}
里面的execute()
方法实际调用的是doExecute()
// CloseableHttpClient#execute
@Override
public CloseableHttpResponse execute(
final HttpUriRequest request,
final HttpContext context) throws IOException, ClientProtocolException {
Args.notNull(request, "HTTP request");
return doExecute(determineTarget(request), request, context);
}
而我们的doExecute()
的默认实现是由两个子类去实现的,InternalHttpClient
和
MinimalHttpClient
,通过不同的创建方式调用不同的实现,在上面的这种创建方式中的实现是InternalHttpClient
接下来看下
InternalHttpClient
中的doExecute()
的实现,在这里会继续调用MainClientExec
的execute()
方法,这个方法会通过连接池获取链接,然后请求执行完返回HttpResponse
类型对象,最后释放对应的连接。
@Override
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
....
Object userToken = context.getUserToken();
//获取连接池
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
//获取请求配置
final RequestConfig config = context.getRequestConfig();
final HttpClientConnection managedConn;
try {
final int timeout = config.getConnectionRequestTimeout();
//从线程池获取连接,阻塞操作,并设置超时时间
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
} catch(final InterruptedException interrupted) {
...
} catch(final ExecutionException ex) {
...
}
final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
try {
...
HttpResponse response;
for (int execCount = 1;; execCount++) {
.....
//设置socketTimeout
final int timeout = config.getSocketTimeout();
if (timeout >= 0) {
managedConn.setSocketTimeout(timeout);
}
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
//发送HTTP请求
response = requestExecutor.execute(request, managedConn, context);
// The connection is in or can be brought to a re-usable state.
//连接处于或可以进入可重用状态。
if (reuseStrategy.keepAlive(response, context)) {
// Set the idle duration of this connection
//设置此连接的空闲持续时间
final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
....
connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
connHolder.markReusable();
} else {
connHolder.markNonReusable();
}
}
// check for entity, release connection if possible
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// connection not needed and (assumed to be) in re-usable state
//释放连接
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
}
return new HttpResponseProxy(response, connHolder);
} catch (final Error error) {
connManager.shutdown();
throw error;
}
}
获取连接
在上面的调用链路我们清楚了,在发起HTTP
请求的时候,会去连接池获取连接,实际调用的是 PoolingHttpClientConnectionManager
类中的 requestConnection()
//PoolingHttpClientConnectionManager#`requestConnection
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
.....
//从连接池中获取一个CPoolEntry(Connection的包装类)
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
@Override
public boolean cancel() {
return future.cancel(true);
}
// 调用leaseConnection方法,并且传入future(CPoolEntry(connection的封装))
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
//租借连接
final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
.....
return conn;
}
};
}
接着看下方法 leaseConnection()
,可以看到CPoolEntry
(Connection
的包装类)实际是调用leaseConnection()
,通过future#get()
获得
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final CPoolEntry entry;
try {
//从future中get
entry = future.get(timeout, timeUnit);
if (entry == null || future.isCancelled()) {
throw new ExecutionException(new CancellationException("Operation cancelled"));
}
....
return CPoolProxy.newProxy(entry);
} catch (final TimeoutException ex) {
throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
}
}
所以我们直接看下AbstractConnPooll
类中
lease()
方法,lease
方法返回了 Future
对象,就是上面通过调用 Future
的get
方法得到PoolEntry
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
synchronized (this) {
try {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
if (done.get()) {
throw new ExecutionException(operationAborted());
}
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
//校验不通过则关闭并释放,继续从连接池中获取entry
if (validateAfterInactivity > 0) {
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(leasedEntry)) {
leasedEntry.close();
release(leasedEntry, false);
continue;
}
}
}
if (done.compareAndSet(false, true)) {
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
} else {
release(leasedEntry, true);
throw new ExecutionException(operationAborted());
}
} catch (final IOException ex) {
if (done.compareAndSet(false, true)) {
if (callback != null) {
callback.failed(ex);
}
}
throw new ExecutionException(ex);
}
}
}
}
获取连接是通过 getPoolEntryBlocking
方法实现的,他在连接池满了的情况会进行阻塞,会一直等待超时,下面就是关键的代码了
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
//设置超时时间
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
}
//线程加锁
this.lock.lock();
try {
E entry;
for (;;) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
//从map中根据key->route获取对应的连接池,没有,则创建连接池
final RouteSpecificPool<T, C, E> pool = getPool(route);
for (;;) {
//从available链表获取空闲连接,优先获取状态相同的,并从链表中移除,添加到leased集合中
entry = pool.getFree(state);
// 若没有空闲连接则创建
if (entry == null) {
break;
}
//如果连接超时,则关闭
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
//连接关闭则移除连接
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
// 若获取的连接不为空,从available链表移除,添加到leased集合中,返回连接
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// 获取连接最大连接数
// New connection is needed
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
//在分配新连接之前清理池
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
// 连接池连接数量若大于等于最大连接数
if (excess > 0) {
for (int i = 0; i < excess; i++) {
// 获取available链表末尾的连接,关闭回收连接
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
// 连接池连接数小于最大连接数
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
// 总的空闲连接数 大于等于 总的连接池剩余容量
if (totalAvailable > freeCapacity - 1) {
// available链表移除连接以及route对应的连接池中删除连接,并关闭连接
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
// 创建新连接,并添加到总的leased集合以及route连接池中,返回连接对象
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
//连接池已满,无法分配连接
boolean success = false;
try {
pool.queue(future);
// 将任务放入pending队列
this.pending.add(future);
//根据设置的超时时间阻塞等待
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
// 从pending队列中移除
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
//检查连接超时,则终止循环,抛出连接超时的异常
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
//释放锁
this.lock.unlock();
}
}
释放连接
从上面的代码可以看出来,释放连接主要在两个地方,第一个是获取连接的时候,当前的连接不可以,则会释放当前连接,第二个就是 Http
请求完成之后,会释放当前连接。主要调用的就是 AbstractConnPool
里面的 release()
方法
@Override
public void release(final E entry, final boolean reusable) {
// 线程加锁
this.lock.lock();
try {
// 从所有连接集合leased中移除当前连接
if (this.leased.remove(entry)) {
final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
// 回收连接,available列表添加当前连接
pool.free(entry, reusable);
if (reusable && !this.isShutDown) {
this.available.addFirst(entry);
onRelease(entry);
} else {
entry.close();
}
// 获取pending队列第一个任务,唤醒该阻塞的任务
PoolEntryFuture<E> future = pool.nextPending();
if (future != null) {
this.pending.remove(future);
} else {
future = this.pending.poll();
}
if (future != null) {
future.wakeup();
}
}
} finally {
// 释放锁
this.lock.unlock();
}
}
关闭连接
关闭连接操作是在获取连接的时候,校验连接是否可用,不可用连接池会移除当前连接并且关闭,还有就是在连接池连接数大于配置的最大连接数的时候,也会关闭多余的空闲连接。
可以发现HttpClient
,是被动清理无效连接,所以如果在长时间没有请求的时候,建议可以通过定时清理空闲连接。
public static class IdleConnectionMonitorThread extends Thread {
private final HttpClientConnectionManager connMgr;
private volatile boolean shutdown;
public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
super();
this.connMgr = connMgr;
}
@Override
public void run() {
try {
while (!shutdown) {
synchronized (this) {
wait(5000);
// Close expired connections
connMgr.closeExpiredConnections();
// Optionally, close connections
// that have been idle longer than 30 sec
connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
}
}
} catch (InterruptedException ex) {
// terminate
}
}
public void shutdown() {
shutdown = true;
synchronized (this) {
notifyAll();
}
}
}
当不再需要httpClient
时,确保调用close()
方法来关闭它以释放连接和资源。
总结
连接池是许多HTTP
客户端库的核心组件,通过设置请求头持久连接(Keep-Alive
),从而减少连接建立和握手的开销,实现连接的复用,特别是在大量且高频的场景中使用,能提升很大的性能。
转载自:https://juejin.cn/post/7291135064838029327