likes
comments
collection
share

连接池——HTTP连接池

作者站长头像
站长
· 阅读数 67

HTTP 长连接 (Keep Alive)

   在 HTTP 1.1 中,引入了 HTTP persistent connection 的概念,也称为 HTTP keep-alive,通过将 HTTP 请求头设置如下内容,可以实现连接在一定时间内的重复使用。

Connection: Keep-Alive

  HTTP连接池是也是基于此去实现连接的复用

HTTP连接池

  在传统的HTTP请求处理方法中,每个请求都需要创建一个新的HTTP连接,这将导致连接的创建和销毁开销较高,并且在大规模并发请求的情况下可能导致性能瓶颈。

  在应用程序和HTTP服务器之间建立一个可重复使用的连接集合,以减少连接的创建和销毁开销,并提高应用程序的性能和响应速度。

HttpClient连接池

  Apache HttpClient连接池是用于管理和优化HTTP连接的重要工具之一。我们接下来以HttpClient连接池为例看下它的实现原理。

HttpClient连接池的使用

  Apache HttpClient连接池是通过PoolingHttpClientConnectionManager 管理 HTTP 连接。

  在使用连接池时,通过使用 setMaxTotal() 和 setDefaultMaxPerRoute() 方法来配置连接池的最大连接数和每个路由的最大连接数。

public static final CloseableHttpClient httpClient;
static {
    RequestConfig config = RequestConfig.custom().setConnectTimeout(5000).setConnectionRequestTimeout(5000).setSocketTimeout(5000).build();

    PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
    // 创建连接池管理器
    // 设置最大连接数和最大线程数
    cm.setMaxTotal(100);
    cm.setDefaultMaxPerRoute(20);

    httpClient = HttpClients.custom()
            // 设置连接池管理
            .setConnectionManager(cm)
            //设置超时时间
            .setDefaultRequestConfig(config)
            // 设置重试次数
            .setRetryHandler(new DefaultHttpRequestRetryHandler(2, false))
            .build();
}

HttpClient连接池的管理

  我们以HttpClient最新4.5.14 版本为例,看下是怎么实现的连接池管理。

调用链路

  我们先来看下普通的HTTP调用的使用,它的入口是CloseableHttpClient类中的 execute()方法,先看下整体的调用链路信息

public static void main(String[] args) throws IOException {
    HttpGet get = new HttpGet("http://www.baidu.com");
    HttpResponse response =  HttpUtil.httpClient.execute(get);
    if (response.getStatusLine().getStatusCode() == 200) {
        HttpEntity resEntity = response.getEntity();
        String message = EntityUtils.toString(resEntity, "UTF-8");
        System.out.println(message);
    } else {
        System.out.println("请求失败");
    }

}

  里面的execute()方法实际调用的是doExecute()

// CloseableHttpClient#execute
@Override
public CloseableHttpResponse execute(
        final HttpUriRequest request,
        final HttpContext context) throws IOException, ClientProtocolException {
    Args.notNull(request, "HTTP request");
    return doExecute(determineTarget(request), request, context);
}

  而我们的doExecute()的默认实现是由两个子类去实现的,InternalHttpClientMinimalHttpClient,通过不同的创建方式调用不同的实现,在上面的这种创建方式中的实现是InternalHttpClient

连接池——HTTP连接池   接下来看下InternalHttpClient中的doExecute()的实现,在这里会继续调用MainClientExecexecute()方法,这个方法会通过连接池获取链接,然后请求执行完返回HttpResponse类型对象,最后释放对应的连接。

@Override
public CloseableHttpResponse execute(
        final HttpRoute route,
        final HttpRequestWrapper request,
        final HttpClientContext context,
        final HttpExecutionAware execAware) throws IOException, HttpException {
    ....
    Object userToken = context.getUserToken();
    //获取连接池
    final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
    //获取请求配置
    final RequestConfig config = context.getRequestConfig();

    final HttpClientConnection managedConn;
    try {
        final int timeout = config.getConnectionRequestTimeout();
        //从线程池获取连接,阻塞操作,并设置超时时间
        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
    } catch(final InterruptedException interrupted) {
        ...
    } catch(final ExecutionException ex) {
       ...
    }

    final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
    try {
        ...
        HttpResponse response;
        for (int execCount = 1;; execCount++) {
            .....
            //设置socketTimeout
            final int timeout = config.getSocketTimeout();
            if (timeout >= 0) {
                managedConn.setSocketTimeout(timeout);
            }
            context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
            //发送HTTP请求
            response = requestExecutor.execute(request, managedConn, context);

            // The connection is in or can be brought to a re-usable state.
            //连接处于或可以进入可重用状态。
            if (reuseStrategy.keepAlive(response, context)) {
                // Set the idle duration of this connection
                //设置此连接的空闲持续时间
                final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
               ....
                connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                connHolder.markReusable();
            } else {
                connHolder.markNonReusable();
            }
        }
        // check for entity, release connection if possible
        final HttpEntity entity = response.getEntity();
        if (entity == null || !entity.isStreaming()) {
            // connection not needed and (assumed to be) in re-usable state
            //释放连接
            connHolder.releaseConnection();
            return new HttpResponseProxy(response, null);
        }
        return new HttpResponseProxy(response, connHolder);
    } catch (final Error error) {
        connManager.shutdown();
        throw error;
    }
}

获取连接

  在上面的调用链路我们清楚了,在发起HTTP请求的时候,会去连接池获取连接,实际调用的是 PoolingHttpClientConnectionManager类中的 requestConnection()

//PoolingHttpClientConnectionManager#`requestConnection
public ConnectionRequest requestConnection(
        final HttpRoute route,
        final Object state) {
    .....
   //从连接池中获取一个CPoolEntry(Connection的包装类)
    final Future<CPoolEntry> future = this.pool.lease(route, state, null);
    return new ConnectionRequest() {

        @Override
        public boolean cancel() {
            return future.cancel(true);
        }

 // 调用leaseConnection方法,并且传入future(CPoolEntry(connection的封装))
        @Override
        public HttpClientConnection get(
                final long timeout,
                final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
            //租借连接    
            final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
           .....
            return conn;
        }
    };

}

  接着看下方法 leaseConnection(),可以看到CPoolEntryConnection的包装类)实际是调用leaseConnection(),通过future#get()获得

protected HttpClientConnection leaseConnection(
        final Future<CPoolEntry> future,
        final long timeout,
        final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
    final CPoolEntry entry;
    try {
         //从future中get
        entry = future.get(timeout, timeUnit);
        if (entry == null || future.isCancelled()) {
            throw new ExecutionException(new CancellationException("Operation cancelled"));
        }
     ....
        return CPoolProxy.newProxy(entry);
    } catch (final TimeoutException ex) {
        throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
    }
}

  所以我们直接看下AbstractConnPooll类中 lease()方法,lease方法返回了 Future对象,就是上面通过调用 Futureget方法得到PoolEntry

public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
    for (;;) {
        synchronized (this) {
            try {
                final E entry = entryRef.get();
                if (entry != null) {
                    return entry;
                }
                if (done.get()) {
                    throw new ExecutionException(operationAborted());
                }
                final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
                //校验不通过则关闭并释放,继续从连接池中获取entry
                if (validateAfterInactivity > 0)  {
                    if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
                        if (!validate(leasedEntry)) {
                            leasedEntry.close();
                            release(leasedEntry, false);
                            continue;
                        }
                    }
                }
                if (done.compareAndSet(false, true)) {
                    entryRef.set(leasedEntry);
                    done.set(true);
                    onLease(leasedEntry);
                    if (callback != null) {
                        callback.completed(leasedEntry);
                    }
                    return leasedEntry;
                } else {
                    release(leasedEntry, true);
                    throw new ExecutionException(operationAborted());
                }
            } catch (final IOException ex) {
                if (done.compareAndSet(false, true)) {
                    if (callback != null) {
                        callback.failed(ex);
                    }
                }
                throw new ExecutionException(ex);
            }
        }
    }
}

  获取连接是通过 getPoolEntryBlocking方法实现的,他在连接池满了的情况会进行阻塞,会一直等待超时,下面就是关键的代码了

private E getPoolEntryBlocking(
        final T route, final Object state,
        final long timeout, final TimeUnit timeUnit,
        final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
    //设置超时时间
    Date deadline = null;
    if (timeout > 0) {
        deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
    }
    //线程加锁
    this.lock.lock();
    try {
        E entry;
        for (;;) {
            Asserts.check(!this.isShutDown, "Connection pool shut down");
            if (future.isCancelled()) {
                throw new ExecutionException(operationAborted());
            }
            //从map中根据key->route获取对应的连接池,没有,则创建连接池
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            for (;;) {
 //从available链表获取空闲连接,优先获取状态相同的,并从链表中移除,添加到leased集合中
                entry = pool.getFree(state);
              // 若没有空闲连接则创建
                if (entry == null) {
                    break;
                }
                 //如果连接超时,则关闭
                if (entry.isExpired(System.currentTimeMillis())) {
                    entry.close();
                }
                if (entry.isClosed()) {
                  //连接关闭则移除连接
                    this.available.remove(entry);
                    pool.free(entry, false);
                } else {
                    break;
                }
            }
            // 若获取的连接不为空,从available链表移除,添加到leased集合中,返回连接
            if (entry != null) {
                this.available.remove(entry);
                this.leased.add(entry);
                onReuse(entry);
                return entry;
            }
            
             // 获取连接最大连接数
            // New connection is needed
            final int maxPerRoute = getMax(route);
            // Shrink the pool prior to allocating a new connection
            //在分配新连接之前清理池
            final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
            // 连接池连接数量若大于等于最大连接数
            if (excess > 0) {
                for (int i = 0; i < excess; i++) {
                 // 获取available链表末尾的连接,关闭回收连接
                    final E lastUsed = pool.getLastUsed();
                    if (lastUsed == null) {
                        break;
                    }
                    lastUsed.close();
                    this.available.remove(lastUsed);
                    pool.remove(lastUsed);
                }
            }
         // 连接池连接数小于最大连接数
            if (pool.getAllocatedCount() < maxPerRoute) {
                final int totalUsed = this.leased.size();
                final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                if (freeCapacity > 0) {
                    final int totalAvailable = this.available.size();
                     // 总的空闲连接数 大于等于 总的连接池剩余容量
                    if (totalAvailable > freeCapacity - 1) {
                  // available链表移除连接以及route对应的连接池中删除连接,并关闭连接
                        final E lastUsed = this.available.removeLast();
                        lastUsed.close();
                        final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                        otherpool.remove(lastUsed);
                    }
             // 创建新连接,并添加到总的leased集合以及route连接池中,返回连接对象
                    final C conn = this.connFactory.create(route);
                    entry = pool.add(conn);
                    this.leased.add(entry);
                    return entry;
                }
            }
 //连接池已满,无法分配连接
            boolean success = false;
            try {
                pool.queue(future);
                 // 将任务放入pending队列
                this.pending.add(future);
                //根据设置的超时时间阻塞等待
                if (deadline != null) {
                    success = this.condition.awaitUntil(deadline);
                } else {
                    this.condition.await();
                    success = true;
                }
                if (future.isCancelled()) {
                    throw new ExecutionException(operationAborted());
                }
            } finally {
                // In case of 'success', we were woken up by the
                // connection pool and should now have a connection
                // waiting for us, or else we're shutting down.
                // Just continue in the loop, both cases are checked.
                 // 从pending队列中移除
                pool.unqueue(future);
                this.pending.remove(future);
            }
            // check for spurious wakeup vs. timeout
            //检查连接超时,则终止循环,抛出连接超时的异常
            if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                break;
            }
        }
        throw new TimeoutException("Timeout waiting for connection");
    } finally {
    //释放锁
        this.lock.unlock();
    }
}

释放连接

  从上面的代码可以看出来,释放连接主要在两个地方,第一个是获取连接的时候,当前的连接不可以,则会释放当前连接,第二个就是 Http 请求完成之后,会释放当前连接。主要调用的就是 AbstractConnPool里面的 release()方法

@Override
public void release(final E entry, final boolean reusable) {
        // 线程加锁
        this.lock.lock();
        try {
            // 从所有连接集合leased中移除当前连接
            if (this.leased.remove(entry)) {
                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                // 回收连接,available列表添加当前连接
                pool.free(entry, reusable);
                if (reusable && !this.isShutDown) {
                    this.available.addFirst(entry);
                    onRelease(entry);
                } else {
                    entry.close();
                }
                // 获取pending队列第一个任务,唤醒该阻塞的任务
                PoolEntryFuture<E> future = pool.nextPending();
                if (future != null) {
                    this.pending.remove(future);
                } else {
                    future = this.pending.poll();
                }
                if (future != null) {
                    future.wakeup();
                }
            }
        } finally {
            // 释放锁
            this.lock.unlock();
        }
}

关闭连接

  关闭连接操作是在获取连接的时候,校验连接是否可用,不可用连接池会移除当前连接并且关闭,还有就是在连接池连接数大于配置的最大连接数的时候,也会关闭多余的空闲连接。

  可以发现HttpClient,是被动清理无效连接,所以如果在长时间没有请求的时候,建议可以通过定时清理空闲连接。

public static class IdleConnectionMonitorThread extends Thread {
  private final HttpClientConnectionManager connMgr;
    private volatile boolean shutdown;
    
    public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
        super();
        this.connMgr = connMgr;
    }

    @Override
    public void run() {
        try {
            while (!shutdown) {
                synchronized (this) {
                    wait(5000);
                    // Close expired connections
                    connMgr.closeExpiredConnections();
                    // Optionally, close connections
                    // that have been idle longer than 30 sec
                    connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
                }
            }
        } catch (InterruptedException ex) {
            // terminate
        }
    }
    
    public void shutdown() {
        shutdown = true;
        synchronized (this) {
            notifyAll();
        }
    }
    
}

  当不再需要httpClient时,确保调用close()方法来关闭它以释放连接和资源。

总结

  连接池是许多HTTP客户端库的核心组件,通过设置请求头持久连接(Keep-Alive),从而减少连接建立和握手的开销,实现连接的复用,特别是在大量且高频的场景中使用,能提升很大的性能。

转载自:https://juejin.cn/post/7291135064838029327
评论
请登录