kafka Topic not present in metadata after 200 ms 引发的思考(上)
前言
有天有位同学跟我说,一直没收到我的“消息”,然后看了眼日志:
发现线上居然有几十个kafka异常,其中就有我的topic send时的异常:
throwable
:[org.apache.kafka.common.errors.TimeoutException]():[]() [Topic]() [xxx]() [not]() [present]() [in]() [metadata]() [after]() [200]() [ms.]()
拉长了下时间发现三十天那有几十个异常,也就是丢失了几十个消息,不允许,我们下面来看看怎么解决下。
我们使用的kafka clinet版本为2.3.1,kafka版本为2.2.0。
解决思路
根据错误日志,是发送消息时找不到topic,在producer寻找元信息时超时了,我们来看看配置:
spring:
# 放kafka的通用配置
kafka:
producer:
#额外参数 见https://kafka.apache.org/documentation/#producerconfigs
properties:
max:
block:
ms: 200
现在配置的是200ms。
The configuration controls how long the `KafkaProducer`'s `send()`, `partitionsFor()`, `initTransactions()`, `sendOffsetsToTransaction()`, `commitTransaction()` and `abortTransaction()` methods will block. For `send()` this timeout bounds the total time waiting for both metadata fetch and buffer allocation (blocking in the user-supplied serializers or partitioner is not counted against this timeout). For `partitionsFor()` this timeout bounds the time spent waiting for metadata if it is unavailable. The transaction-related methods always block, but may timeout if the transaction coordinator could not be discovered or did not respond within the timeout.
| Type: | long |
| ------------- | ---------------- |
| Default: | 60000 (1 minute) |
| Valid Values: | [0,...] |
| Importance: | medium
可以看到默认的为1分钟,200ms确实有点短,先搞长点,试了下调成1000ms。
但是才过几天还有发现有一条。
现在有两个方法:
- 继续调高超时时间,后续观察
- 加入重试
继续调高超时时间
首先要弄清楚每次send都需要进行metadata fetch
吗?
在文档中搜索了下相关的配置:
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
| Type: | long |
| ------------- | ------------------ |
| Default: | 300000 (5 minutes) |
| Valid Values: | [0,...] |
| Importance: | low
多长时间获取一次,五分钟已经很长了,不需要调整。
Controls how long the producer will cache metadata for a topic that's idle. If the elapsed time since a topic was last produced to exceeds the metadata idle duration, then the topic's metadata is forgotten and the next access to it will force a metadata fetch request.
Type: | long |
| ------------- | ------------------ |
| Default: | 300000 (5 minutes) |
| Valid Values: | [5000,...] |
| Importance: | low
5分钟和metadata.max.age.ms
一样,就算没有空闲,也会因为保存时间超时重新获取。
结论
相关的配置目前还是很合理的,可以把max.block.ms
再调整大点,10000ms好了。
加入重试
spring:
kafka:
producer:
#额外参数 见https://kafka.apache.org/documentation/#producerconfigs
properties:
#update metadata 超时时间
max:
block:
ms: 1000
retry:
backoff:
ms: 200
retries: 3
重试三次,间隔200ms。
找到匹配版本的文档看下,retries:
重试总时间受delivery.timeout.ms限制,官方建议使用delivery.timeout.ms来进行控制:
默认为两分钟,如果遇到不可恢复的错误、重试次数已经用完,或者因为其他配置触发了限制,就不会等待该时间才失败。
结论1
目前看来默认的重试值和超时时间,其实都不需要进行修改。
确认是否是可重试异常
日志里面没有看到进行重试,这是为啥呢?根据日志看到异常是org.apache.kafka.common.errors.TimeoutException
看看是否是可重试异常:
package org.apache.kafka.common.errors;
/**
* Indicates that a request timed out.
*/
public class TimeoutException extends RetriableException {
private static final long serialVersionUID = 1L;
public TimeoutException() {
super();
}
public TimeoutException(String message, Throwable cause) {
super(message, cause);
}
public TimeoutException(String message) {
super(message);
}
public TimeoutException(Throwable cause) {
super(cause);
}
}
发现是的,那为啥不进行重试,还是要看看源码。偷个懒直接google搜下:
发现没有直接答案,问下gpt:
找下doSend方法,发现没有,那就在这个类中搜下retry,打个断点:
发现是一个独立的线程处理的(kafka send消息不会发送一个向Broker发送一次,会放入队列中批量发送,这个以后再详细介绍),reties、retryBackoffMs的默认值可以看到和文档中描述的一样,至于错误没看明白为啥是NONE,不过可以确认不是可重试异常:
可以稍微看一眼canRetry方法:
private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
batch.attempts() < this.retries &&
!batch.isDone() &&
((response.error.exception() instanceof RetriableException) ||
(transactionManager != null && transactionManager.canRetry(response, batch)));
}
需要:
- 没有达到deliverTimeout,这个前面有提到
- ProducerBatch.attempts小于重试次数
- ProducerBatch还没有完成
- 异常是否是可重试异常或事务是否可重试
结论2
当发生异常:
throwable
:[org.apache.kafka.common.errors.TimeoutException]():[]() [Topic]() [xxx]() [not]() [present]() [in]() [metadata]() [after]() [200]() [ms.]()
不能利用重试机制进行补偿
结论
直接使用metadata.max.age.ms的默认值就好了,不要指定了。
扩展
虽然有了最终结论,但是很好奇,为啥它不是可重试异常?
- 不确定具体原因,可能确实没有这个topic
- 可能是kafka内部出现了不可恢复的异常,导致无法拿到topic信息
- 重试可能会导致数据重复,需要控制幂等
但是说实话我觉得可以重试,控制幂等就行,需要做的工作:
- 监听这个异常
- 进行重试
- 控制幂等
转载自:https://juejin.cn/post/7244470938870923322