likes
comments
collection
share

kafka Topic not present in metadata after 200 ms 引发的思考(上)

作者站长头像
站长
· 阅读数 10

前言

有天有位同学跟我说,一直没收到我的“消息”,然后看了眼日志: kafka Topic not present in metadata after 200 ms 引发的思考(上) 发现线上居然有几十个kafka异常,其中就有我的topic send时的异常:

throwable

:[org.apache.kafka.common.errors.TimeoutException]():[]() [Topic]() [xxx]() [not]() [present]() [in]() [metadata]() [after]() [200]() [ms.]()

拉长了下时间发现三十天那有几十个异常,也就是丢失了几十个消息,不允许,我们下面来看看怎么解决下。

我们使用的kafka clinet版本为2.3.1,kafka版本为2.2.0。

解决思路

根据错误日志,是发送消息时找不到topic,在producer寻找元信息时超时了,我们来看看配置:

spring:
  # 放kafka的通用配置
  kafka:
    producer:
      #额外参数 见https://kafka.apache.org/documentation/#producerconfigs
      properties:
        max:
          block:
            ms: 200

现在配置的是200ms。

max.block.ms

The configuration controls how long the `KafkaProducer`'s `send()`, `partitionsFor()`, `initTransactions()`, `sendOffsetsToTransaction()`, `commitTransaction()` and `abortTransaction()` methods will block. For `send()` this timeout bounds the total time waiting for both metadata fetch and buffer allocation (blocking in the user-supplied serializers or partitioner is not counted against this timeout). For `partitionsFor()` this timeout bounds the time spent waiting for metadata if it is unavailable. The transaction-related methods always block, but may timeout if the transaction coordinator could not be discovered or did not respond within the timeout.

| Type:         | long             |
| ------------- | ---------------- |
| Default:      | 60000 (1 minute) |
| Valid Values: | [0,...]          |
| Importance:   | medium

可以看到默认的为1分钟,200ms确实有点短,先搞长点,试了下调成1000ms。

但是才过几天还有发现有一条。

现在有两个方法:

  • 继续调高超时时间,后续观察
  • 加入重试

继续调高超时时间

首先要弄清楚每次send都需要进行metadata fetch吗?

在文档中搜索了下相关的配置:

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

| Type:         | long               |
| ------------- | ------------------ |
| Default:      | 300000 (5 minutes) |
| Valid Values: | [0,...]            |
| Importance:   | low

多长时间获取一次,五分钟已经很长了,不需要调整。

Controls how long the producer will cache metadata for a topic that's idle. If the elapsed time since a topic was last produced to exceeds the metadata idle duration, then the topic's metadata is forgotten and the next access to it will force a metadata fetch request.
Type:         | long               |
| ------------- | ------------------ |
| Default:      | 300000 (5 minutes) |
| Valid Values: | [5000,...]         |
| Importance:   | low

5分钟和metadata.max.age.ms一样,就算没有空闲,也会因为保存时间超时重新获取。

结论

相关的配置目前还是很合理的,可以把max.block.ms再调整大点,10000ms好了。

加入重试

spring:
  kafka:
    producer:
      #额外参数 见https://kafka.apache.org/documentation/#producerconfigs
      properties:
        #update metadata 超时时间
        max:
          block:
            ms: 1000
        retry:
          backoff:
            ms: 200
      retries: 3

重试三次,间隔200ms。

找到匹配版本的文档看下,retries: kafka Topic not present in metadata after 200 ms 引发的思考(上) 重试总时间受delivery.timeout.ms限制,官方建议使用delivery.timeout.ms来进行控制: kafka Topic not present in metadata after 200 ms 引发的思考(上) 默认为两分钟,如果遇到不可恢复的错误、重试次数已经用完,或者因为其他配置触发了限制,就不会等待该时间才失败。

结论1

目前看来默认的重试值和超时时间,其实都不需要进行修改。

确认是否是可重试异常

日志里面没有看到进行重试,这是为啥呢?根据日志看到异常是org.apache.kafka.common.errors.TimeoutException看看是否是可重试异常:

package org.apache.kafka.common.errors;

/**
 * Indicates that a request timed out.
 */
public class TimeoutException extends RetriableException {

    private static final long serialVersionUID = 1L;

    public TimeoutException() {
        super();
    }

    public TimeoutException(String message, Throwable cause) {
        super(message, cause);
    }

    public TimeoutException(String message) {
        super(message);
    }

    public TimeoutException(Throwable cause) {
        super(cause);
    }

}

发现是的,那为啥不进行重试,还是要看看源码。偷个懒直接google搜下: kafka Topic not present in metadata after 200 ms 引发的思考(上) 发现没有直接答案,问下gpt: kafka Topic not present in metadata after 200 ms 引发的思考(上) 找下doSend方法,发现没有,那就在这个类中搜下retry,打个断点: kafka Topic not present in metadata after 200 ms 引发的思考(上) 发现是一个独立的线程处理的(kafka send消息不会发送一个向Broker发送一次,会放入队列中批量发送,这个以后再详细介绍),reties、retryBackoffMs的默认值可以看到和文档中描述的一样,至于错误没看明白为啥是NONE,不过可以确认不是可重试异常: kafka Topic not present in metadata after 200 ms 引发的思考(上) 可以稍微看一眼canRetry方法:

private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
    return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
        batch.attempts() < this.retries &&
        !batch.isDone() &&
        ((response.error.exception() instanceof RetriableException) ||
            (transactionManager != null && transactionManager.canRetry(response, batch)));
}

需要:

  • 没有达到deliverTimeout,这个前面有提到
  • ProducerBatch.attempts小于重试次数
  • ProducerBatch还没有完成
  • 异常是否是可重试异常或事务是否可重试

结论2

当发生异常:

throwable

:[org.apache.kafka.common.errors.TimeoutException]():[]() [Topic]() [xxx]() [not]() [present]() [in]() [metadata]() [after]() [200]() [ms.]()

不能利用重试机制进行补偿

结论

直接使用metadata.max.age.ms的默认值就好了,不要指定了。

扩展

虽然有了最终结论,但是很好奇,为啥它不是可重试异常?

  • 不确定具体原因,可能确实没有这个topic
  • 可能是kafka内部出现了不可恢复的异常,导致无法拿到topic信息
  • 重试可能会导致数据重复,需要控制幂等

但是说实话我觉得可以重试,控制幂等就行,需要做的工作:

  • 监听这个异常
  • 进行重试
  • 控制幂等