聊聊 Http 连接池
摘要
1、Apache HttpClient
2、OKHttp3
3、reactor-netty HttpClient
4、zuul/feign/spring cloud gateway中采用的连接池
一、生产偶现慢接口告警
1 月 16 号上午 9:12:55~9:12:56,1 秒左右的时间,nginx 告警有 73 个请求耗时超过 3s。
简化后的系统架构图如下:
经统计,所有慢接口均为 5703 接口,所有慢接口都属于 operation-service 服务,其他服务正常。
1、哪个服务出现问题了?网关 zuul 还是 operation-service 服务?
查看 skywalking 监控,发现 operation-service 服务仅耗时 12 ms,大概率是 zuul 的问题。
2、由于 1s 左右就恢复了,猜测是否当时正好进行了 Full GC?
通过 jstat -gc 查看,6 次 full gc 共耗时 0.775s,平均每次耗时不到 200ms。另外,查看日志,发现在 55~56s 期间,还有不少请求都是 20ms 以内就返回了,基本可以确定和 full gc 无关。
3、猜测是否和连接池有关?
zuul1.0 默认通过 ribbon 进行负载均衡。查看 ribbon 的配置,除了 ribbon.ReadTimeout 修改为 10s 以外,其余都是默认配置。
public class DefaultClientConfigImpl implements IClientConfig {
// 每个路由最大连接数 50
public static final int DEFAULT_MAX_CONNECTIONS_PER_HOST = 50;
// 最大连接数 200
public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 200;
}
查看日志,发现每秒上百次请求,其中有 80% 都是 operation-service 服务的。这么看,只要 operation-service 服务有接口稍微慢一些,默认的 50 个连接很可能就不够了。可以调大连接数:
ribbon:
MaxConnectionsPerHost: 200
MaxTotalConnections: 500
从而引申出本文要讨论的内容,http 连接池的比较。
二、Http 连接池介绍
0、连接池的常用参数
采用 http 连接池的主要好处在于连接复用,减少创建/销毁 tcp 连接的开销,提高性能。
一般有以下常用参数:
**最大连接数:**限制连接池中的最大连接,不能配置的太小,否则高并发时容易阻塞。
**单路由最大连接数:**一般用在网关中,微服务架构体系下,网关可能需要连接几十个微服务,限制单路由最大连接数,某个服务发生异常时,可以尽量不影响其他服务。
**最大空闲连接数:**如果连接池中的连接都不再使用,也是一种浪费,可以清理掉部分连接。
**最大空闲时间:**当连接池中数量超过最大空闲连接数,可以清理掉那些超过最大空闲时间的连接。
**最大存活时间:**当超过最大存活时间时,只要不在用,可以立即清理掉。
1、Apache HttpClient(版本4.5.5)
zuul1.0 默认采用的是 HttpClient,一般通过 PoolingHttpClientConnectionManager
创建。
public PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final long timeToLive, final TimeUnit tunit) {
super();
this.configData = new ConfigData();
this.pool = new CPool(new InternalConnectionFactory(
this.configData, connFactory), 2, 20, timeToLive, tunit);
this.pool.setValidateAfterInactivity(2000);
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
this.isShutDown = new AtomicBoolean(false);
}
public CPool(
final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final int defaultMaxPerRoute, final int maxTotal,
final long timeToLive, final TimeUnit tunit) {
super(connFactory, defaultMaxPerRoute, maxTotal);
this.timeToLive = timeToLive;
this.tunit = tunit;
}
public AbstractConnPool(
final ConnFactory<T, C> connFactory,
final int defaultMaxPerRoute,
final int maxTotal) {
super();
this.connFactory = Args.notNull(connFactory, "Connection factory");
this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
this.maxTotal = Args.positive(maxTotal, "Max total value");
this.lock = new ReentrantLock();
this.condition = this.lock.newCondition();
this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
this.leased = new HashSet<E>();
this.available = new LinkedList<E>();
this.pending = new LinkedList<Future<E>>();
this.maxPerRoute = new HashMap<T, Integer>();
}
可以看到默认的 defaultMaxPerRoute=2,maxTotal=20,即每个路由最多分配 2 个连接,最大总连接数为 20,默认值设置的过于小了,存在很明显的并发限制。
接下来看下 zuul 中是如何配置这个连接池的。
// HttpClientRibbonConfiguration
@Bean
@ConditionalOnMissingBean(HttpClientConnectionManager.class)
public HttpClientConnectionManager httpClientConnectionManager(
IClientConfig config,
ApacheHttpClientConnectionManagerFactory connectionManagerFactory) {
RibbonProperties ribbon = RibbonProperties.from(config);
// 最大连接数
int maxTotalConnections = ribbon.maxTotalConnections();
// 单个路由的最大连接数
int maxConnectionsPerHost = ribbon.maxConnectionsPerHost();
int timerRepeat = ribbon.connectionCleanerRepeatInterval();
long timeToLive = ribbon.poolKeepAliveTime();
TimeUnit ttlUnit = ribbon.getPoolKeepAliveTimeUnits();
final HttpClientConnectionManager connectionManager = connectionManagerFactory
.newConnectionManager(false, maxTotalConnections,
maxConnectionsPerHost, timeToLive, ttlUnit, registryBuilder);
// 定时任务清理过期连接
this.connectionManagerTimer.schedule(new TimerTask() {
@Override
public void run() {
connectionManager.closeExpiredConnections();
}
}, 30000, timerRepeat);
return connectionManager;
}
public HttpClientConnectionManager newConnectionManager(boolean disableSslValidation, int maxTotalConnections, int maxConnectionsPerRoute, long timeToLive, TimeUnit timeUnit, RegistryBuilder registryBuilder) {
Registry<ConnectionSocketFactory> registry = registryBuilder.build();
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(registry, (HttpConnectionFactory)null, (SchemePortResolver)null, (DnsResolver)null, timeToLive, timeUnit);
// 创建 connectionManager 后,重置了 maxTotal 以及 defaultMaxPerRoute 的值
connectionManager.setMaxTotal(maxTotalConnections);
connectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute);
return connectionManager;
}
public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver,
final long timeToLive, final TimeUnit tunit) {
this(
new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
connFactory,
timeToLive, tunit
);
}
public PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final long timeToLive, final TimeUnit tunit) {
super();
this.configData = new ConfigData();
this.pool = new CPool(new InternalConnectionFactory(
this.configData, connFactory), 2, 20, timeToLive, tunit);
this.pool.setValidateAfterInactivity(2000);
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
this.isShutDown = new AtomicBoolean(false);
}
可以看到,在创建 PoolingHttpClientConnectionManager 后,重置了 maxTotal 以及 defaultMaxPerRoute 的值,该值可以在配置文件中设置:
ribbon:
MaxConnectionsPerHost: 200
MaxTotalConnections: 500
可以通过 debug 确认:
定时任务处理过期连接:
// PoolingHttpClientConnectionManager
public void closeExpiredConnections() {
this.log.debug("Closing expired connections");
this.pool.closeExpired();
}
// 清理过期连接(存活时间超过最大值)
public void closeExpired() {
final long now = System.currentTimeMillis();
enumAvailable(new PoolEntryCallback<T, C>() {
@Override
public void process(final PoolEntry<T, C> entry) {
if (entry.isExpired(now)) {
entry.close();
}
}
});
}
2、OkHttp 中的连接池(版本3.14.9)
public static final OkHttpClient defaultOkHttpClient = new OkHttpClient.Builder()
//设置连接池
.connectionPool(new ConnectionPool(10, 60, TimeUnit.SECONDS))
//设置连接超时
.connectTimeout(3, TimeUnit.SECONDS)
//设置读超时
.readTimeout(10, TimeUnit.SECONDS)
//设置写超时
.writeTimeout(10, TimeUnit.SECONDS)
.build();
一般自定义 ConnectionPool 即可。
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
}
maxIdleConnections
: 最大空闲连接数;
keepAliveDuration + timeUnit
:最大空闲时间,当空闲时间超过该值时,连接有可能会被清理。
当连接空闲时间超过 keepAliveDuration + timeUnit,并且当前空闲连接数 > maxIdleConnections 时,清除该连接,由单独任务专门清理过期连接。
// 单独任务,专门清理过期连接
private final Runnable cleanupRunnable = () -> {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (RealConnectionPool.this) {
try {
RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
// 清理过期连接
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// 当该连接空闲时间超过 keepAliveDurationNs,并且当前空闲连接数 > maxIdleConnection 时,清除该连接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
3、OpenFeign
OpenFeign 同时支持 Apache HttpClient 和 OkHttp 作为连接池,自由选择即可。
3.1 采用 Apache HttpClient,OpenFeign 版本 2.1.0
连接池可选,如果需要采用 Apache HttpClient 的话,直接引入依赖即可。
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
<version>xx.xx</version>
</dependency>
一般通过 PoolingHttpClientConnectionManager
创建。
@Import({ HttpClientFeignLoadBalancedConfiguration.class,
OkHttpFeignLoadBalancedConfiguration.class,
DefaultFeignLoadBalancedConfiguration.class })
public class FeignRibbonClientAutoConfiguration {
@Bean
@Primary
@ConditionalOnMissingBean
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
public CachingSpringLoadBalancerFactory cachingLBClientFactory(
SpringClientFactory factory) {
return new CachingSpringLoadBalancerFactory(factory);
}
@Bean
@ConditionalOnMissingBean
public Request.Options feignRequestOptions() {
return LoadBalancerFeignClient.DEFAULT_OPTIONS;
}
}
// HttpClientFeignLoadBalancedConfiguration
@Bean
@ConditionalOnMissingBean(HttpClientConnectionManager.class)
public HttpClientConnectionManager connectionManager(
ApacheHttpClientConnectionManagerFactory connectionManagerFactory,
FeignHttpClientProperties httpClientProperties) {
final HttpClientConnectionManager connectionManager = connectionManagerFactory
.newConnectionManager(httpClientProperties.isDisableSslValidation(), httpClientProperties.getMaxConnections(),
httpClientProperties.getMaxConnectionsPerRoute(),
httpClientProperties.getTimeToLive(),
httpClientProperties.getTimeToLiveUnit(), registryBuilder);
this.connectionManagerTimer.schedule(new TimerTask() {
@Override
public void run() {
connectionManager.closeExpiredConnections();
}
}, 30000, httpClientProperties.getConnectionTimerRepeat());
return connectionManager;
}
默认的最大连接数还是 200,单路由最大连接数 50,可以通过以下配置修改:
feign:
httpclient:
maxConnections: 600
maxConnectionsPerRoute: 200
3.2 采用 OkHttp,OpenFeign 版本 3.1.3
public class FeignAutoConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(OkHttpClient.class)
@ConditionalOnMissingBean(okhttp3.OkHttpClient.class)
@ConditionalOnProperty("feign.okhttp.enabled")
protected static class OkHttpFeignConfiguration {
private okhttp3.OkHttpClient okHttpClient;
@Bean
@ConditionalOnMissingBean(ConnectionPool.class)
public ConnectionPool httpClientConnectionPool(FeignHttpClientProperties httpClientProperties,
OkHttpClientConnectionPoolFactory connectionPoolFactory) {
int maxTotalConnections = httpClientProperties.getMaxConnections();
long timeToLive = httpClientProperties.getTimeToLive();
TimeUnit ttlUnit = httpClientProperties.getTimeToLiveUnit();
return connectionPoolFactory.create(maxTotalConnections, timeToLive, ttlUnit);
}
@Bean
public okhttp3.OkHttpClient client(OkHttpClientFactory httpClientFactory, ConnectionPool connectionPool,
FeignHttpClientProperties httpClientProperties) {
boolean followRedirects = httpClientProperties.isFollowRedirects();
int connectTimeout = httpClientProperties.getConnectionTimeout();
boolean disableSslValidation = httpClientProperties.isDisableSslValidation();
Duration readTimeout = httpClientProperties.getOkHttp().getReadTimeout();
this.okHttpClient = httpClientFactory.createBuilder(disableSslValidation)
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS).followRedirects(followRedirects)
.readTimeout(readTimeout).connectionPool(connectionPool).build();
return this.okHttpClient;
}
@Bean
@ConditionalOnMissingBean(Client.class)
public Client feignClient(okhttp3.OkHttpClient client) {
return new OkHttpClient(client);
}
}
}
创建 OkHttpFeignConfiguration 的前提是引入依赖,并且开启 feign.okhttp.enabled=true。
引入依赖:
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
<version>11.8</version>
</dependency>
可通过以下配置更改过期清理时间等:
feign:
okhttp:
enabled: true
httpclient:
time-to-live: 60
ok-http:
read-timeout: 10s
4、SpringCloudGateway中的HttpClient
GatewayAutoConfiguration 类中定义了 HttpClientProperties 和 HttpClientFactory。
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {
@Bean
@ConditionalOnMissingBean({ HttpClient.class, HttpClientFactory.class })
public HttpClientFactory gatewayHttpClientFactory(HttpClientProperties properties,
ServerProperties serverProperties, List<HttpClientCustomizer> customizers) {
return new HttpClientFactory(properties, serverProperties, customizers);
}
@Bean
public HttpClientProperties httpClientProperties() {
return new HttpClientProperties();
}
@Bean
@ConditionalOnEnabledGlobalFilter
public NettyRoutingFilter routingFilter(HttpClient httpClient,
ObjectProvider<List<HttpHeadersFilter>> headersFilters, HttpClientProperties properties) {
return new NettyRoutingFilter(httpClient, headersFilters, properties);
}
}
其中 HttpClientProperties 定义了默认的连接池为 ELASTIC,即不限制最大连接数,按需创建。
public class HttpClientProperties {
public static class Pool {
/** Type of pool for HttpClient to use, defaults to ELASTIC. */
private PoolType type = PoolType.ELASTIC;
/**
* Only for type FIXED, the maximum number of connections before starting pending
* acquisition on existing ones.
*/
private Integer maxConnections = ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS;
/**
* Time in millis after which the channel will be closed. If NULL, there is no max
* idle time.
*/
private Duration maxIdleTime = null;
/**
* Duration after which the channel will be closed. If NULL, there is no max life
* time.
*/
private Duration maxLifeTime = null;
}
}
通过 HttpClientFactory 创建 HttpClient,连接池策略由 ConnectionProvider 配置。
public class HttpClientFactory {
protected HttpClient createInstance() {
// configure pool resources
ConnectionProvider connectionProvider = buildConnectionProvider(properties);
HttpClient httpClient = HttpClient.create(connectionProvider)
// TODO: move customizations to HttpClientCustomizers
.httpResponseDecoder(this::httpResponseDecoder);
...
return httpClient;
}
protected ConnectionProvider buildConnectionProvider(HttpClientProperties properties) {
HttpClientProperties.Pool pool = properties.getPool();
ConnectionProvider connectionProvider;
if (pool.getType() == DISABLED) {
connectionProvider = ConnectionProvider.newConnection();
}
else {
// create either Fixed or Elastic pool
ConnectionProvider.Builder builder = ConnectionProvider.builder(pool.getName());
if (pool.getType() == FIXED) {
builder.maxConnections(pool.getMaxConnections()).pendingAcquireMaxCount(-1)
.pendingAcquireTimeout(Duration.ofMillis(pool.getAcquireTimeout()));
}
else {
// Elastic 不限制连接数
builder.maxConnections(Integer.MAX_VALUE).pendingAcquireTimeout(Duration.ofMillis(0))
.pendingAcquireMaxCount(-1);
}
if (pool.getMaxIdleTime() != null) {
builder.maxIdleTime(pool.getMaxIdleTime());
}
if (pool.getMaxLifeTime() != null) {
builder.maxLifeTime(pool.getMaxLifeTime());
}
builder.evictInBackground(pool.getEvictionInterval());
builder.metrics(pool.isMetrics());
connectionProvider = builder.build();
}
return connectionProvider;
}
}
PooledConnectionProvider 提供获取连接方法。
public abstract class PooledConnectionProvider {
PooledConnectionProvider(Builder builder, @Nullable Clock clock) {
this.builder = builder;
this.name = builder.name;
this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval;
this.poolInactivity = builder.poolInactivity;
this.disposeTimeout = builder.disposeTimeout;
this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock);
for (Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
maxConnections.put(entry.getKey(), entry.getValue().maxConnections);
}
this.onDispose = Mono.empty();
scheduleInactivePoolsDisposal();
}
// 获取连接,这里 idea 提示库源与类字节码不符,出现无法跳转等异常情况,源码不看了
// 看逻辑大概是:每个目标地址(remoteAddress)都有个poolFactory,基于 poolFactory 创建连接池 PooledConnectionAllocator.
public final Mono<? extends Connection> acquire(
TransportConfig config,
ConnectionObserver connectionObserver,
@Nullable Supplier<? extends SocketAddress> remote,
@Nullable AddressResolverGroup<?> resolverGroup) {
return Mono.create(sink -> {
SocketAddress remoteAddress = Objects.requireNonNull(remote.get(), "Remote Address supplier returned null");
PoolKey holder = new PoolKey(remoteAddress, config.channelHash());
PoolFactory<T> poolFactory = poolFactory(remoteAddress);
InstrumentedPool<T> pool = MapUtils.computeIfAbsent(channelPools, holder, poolKey -> {
InstrumentedPool<T> newPool = createPool(config, poolFactory, remoteAddress, resolverGroup);
return newPool;
});
EventLoop eventLoop;
if (sink.contextView().hasKey(CONTEXT_CALLER_EVENTLOOP)) {
eventLoop = sink.contextView().get(CONTEXT_CALLER_EVENTLOOP);
}
else {
EventLoopGroup group = config.loopResources().onClient(config.isPreferNative());
if (group instanceof ColocatedEventLoopGroup) {
eventLoop = ((ColocatedEventLoopGroup) group).nextInternal();
}
else {
eventLoop = null;
}
}
Mono<PooledRef<T>> mono = pool.acquire(Duration.ofMillis(poolFactory.pendingAcquireTimeout));
if (eventLoop != null) {
mono = mono.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, eventLoop));
}
Context currentContext = Context.of(sink.contextView());
if ((poolFactory.metricsEnabled || config.metricsRecorder() != null)
&& Metrics.isMicrometerAvailable()) {
Object currentObservation = reactor.netty.Metrics.currentObservation(currentContext);
if (currentObservation != null) {
currentContext = reactor.netty.Metrics.updateContext(currentContext, currentObservation);
mono = mono.contextWrite(ctx -> reactor.netty.Metrics.updateContext(ctx, currentObservation));
}
}
mono.subscribe(createDisposableAcquire(config, connectionObserver,
poolFactory.pendingAcquireTimeout, pool, sink, currentContext));
});
}
}
三、总结
Zuul1.0中HttpClient | OKHttpClient | SpringCloudGateway中的reactor-netty HttpClient | |
---|---|---|---|
最大连接数 | 200 | - | - |
单路由最大连接数 | 50 | - | - |
最大空闲连接数 | - | 自定义,比如 1 | - |
最大空闲时间 | 30s | 自定义,比如 60s | 自定义 |
最大存活时间 | 900s | - | 自定义 |
常用的连接池主要有 Apache HttpClient,OKHttpClient 这两种,另外 Spring Cloud Gateway 则采用了 reactor-netty 中 的 HttpClient。
Feign 同时提供了对 Apache HttpClient、OKHttpClient 连接池的支持,可自由选择。
可以看到,OKHttpClient 和 reactor-netty HttpClient 都不关注最大连接数,主要关注的是最大空闲时间。有时候,提供的可设置参数过多,反而会让人纠结。
比如最大连接数,应该配置多少合理呢?上线前谁也不知道接口会有多大并发,保险起见最好配置大些。单路由最大连接数,这个要根据每个微服务的访问流量来配置,怎么确定?
既然无法确定,干脆不限制好了。连接的创建并不消耗多少资源,只要不是频繁的创建/销毁就行。如果有突发流量造成了池中有大量连接,通过设置最大空闲时间,及时清理掉就行。
转载自:https://juejin.cn/post/7325623087209906214