likes
comments
collection
share

生产环境的Kafka无法正常消费了——跨机房消费和一个消费者配置项惹的祸

作者站长头像
站长
· 阅读数 6

生产环境的Kafka无法正常消费了——跨机房消费和一个消费者配置项惹的祸

本文先记录了一次在生产环境中排查并解决Kafka无法正常消费的过程,然后介绍如何复现此次故障,最后结合源代码分析出故障的原因。

先来介绍一下生产环境的服务配置。由于历史原因,我们的Kafka集群和消费这个集群的服务(由Go语言编写,以下简称为“消费者服务”)是跨机房部署的,这两个跨越大洋的机房相距大约1万公里。

故障的最初表现是收到消费者服务对应的Kafka Topic Lag超过阈值的报警,于是查看消费者服务的错误日志,发现有如下所示的大量TCP read timeout的错误,且消费者服务几乎不消费了,进而产生了更多Lag超过阈值的报警。

kafka: error while consuming test/1: read tcp <Kafka broker IP>:54953-><Kafka broker IP>:9092: i/o timeout

本文用<Kafka broker IP>替代了实际的IP地址

排查原因

在无脑重启消费者服务后发现Lag没有下降,TCP read timeout的错误也没有消失。考虑到是跨机房消费Kafka,最先想到的是网络丢包引起的问题,于是ping了Kafka的Broker,发现没有丢包,而且ICMP数据包的时延是100多毫秒,也在正常范围内(以光速通过1万公里都需要33.33毫秒)。

既然没有丢包,说明网络层OK,那会不会是传输层的问题呢?于是先后运行nctcpdump命令。

$ nc -v -z -w 1 <Kafka broker IP> 9092
Connection to <Kafka broker IP> 9092 port [tcp/*] succeeded!

$ sudo tcpdump 'port 9092 and host <Kafka broker IP>' -vvv

nc命令的结果表示TCP连接建立成功(-v是输出更多信息;-z是扫描监听状态的端口,且不向其发送任何数据;-w 1是建立连接的超时时间)。从tcpdump的输出中也能找到三次握手的TCP数据包序列——Flags字段分别为[S][S.][.](分别表示SYN、SYN+ACK和ACK),以及收发数据的序列——Flags字段分别为[P.][.](分别表示PUSH和ACK)。

看起来网络层和传输层都正常,而且负责Kafka的同学反馈集群正常。哎,似乎陷入了僵局,只能再从Kafka消费者的配置项上入手,碰碰运气了。

消费者服务使用的是github.com/bsm/sarama-clustergithub.com/Shopify/sarama(Version 1.11.0 (2016-12-20),确实够老的)这两个库来消费Kafka Topic的。Read timeout的配置项采用的是默认值,定义在下面的代码中,

// .../vendor/github.com/Shopify/sarama/config.go

// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
    c := &Config{}

    c.Net.MaxOpenRequests = 5
    c.Net.DialTimeout = 30 * time.Second
    c.Net.ReadTimeout = 30 * time.Second
    c.Net.WriteTimeout = 30 * time.Second
    // ...

很可能就是从Kafka Broker读消息时超过了这里的30秒,导致满屏的TCP read timeout报错

kafka: error while consuming test/1: read tcp <Kafka broker IP>:54953-><Kafka broker IP>:9092: i/o timeout

另外,为了提升跨机房消费Kafka的吞吐量,我们曾增大了Consumer.Fetch.Default这个参数,远比默认值32768要大。

// Consumer is the namespace for configuration related to consuming messages,
// used by the Consumer.
//
// ...
Consumer struct {
    // ...

    // Fetch is the namespace for controlling how many bytes are retrieved by any
    // given request.
    Fetch struct {
        // ...
        // The default number of message bytes to fetch from the broker in each
        // request (default 32768). This should be larger than the majority of
        // your messages, or else the consumer will spend a lot of time
        // negotiating sizes and not actually consuming. Similar to the JVM's
        // `fetch.message.max.bytes`.
        Default int32
        // ...
    }

那会不会是因为在当前的网络状况下(事后才知道那时网络发生了什么),单次Fetch请求获取了过多字节数的消息,又因网速慢使得消费者服务读不过来,导致TCP read timeout呢?

于是我们尝试改小了这个配置,重启服务后,可以正常消费了!

复现故障

看起来是改小了Consumer.Fetch.Default这个参数,消费者服务就恢复正常了,但这背后的原因是什么呢?

为了复现这个故障,并验证假设(单次请求获取了过多字节数的消息,网速慢使得消费者服务读不过来),我们先在虚拟机上搭建了一个3节点的Kafka集群,然后写了一段最简单的消费者代码。

package main

import (
    "fmt"
    "log"
    "os"
    "time"

    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

func main() {
    var logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

    brokerAddrs := []string{
        "192.168.33.10:9092",    // 虚拟机上的Kafka Broker
    }
    topic := "test"
    groupId := "test-consumer-group"

    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
    config.Consumer.Offsets.CommitInterval = 5 * time.Second
    config.Consumer.Fetch.Default = 10485760 // ⚠️注意这个配置项

    consumer, err := cluster.NewConsumer(brokerAddrs, groupId, []string{topic}, config)
    if err != nil {
        fmt.Println(err)
    }

    logger.Println("consumer started")
    for {
        select {
        case msg, ok := <-consumer.Messages():
            if ok {
                logger.Println("consuming:", string(msg.Value), string(msg.Key))
                consumer.MarkOffset(msg, "")
            }
        case e, ok := <-consumer.Errors():
            if ok {
                logger.Println("error", e)
            }
        case ntf, ok := <-consumer.Notifications():
            if ok {
                logger.Println("notification", ntf)
            }
        }
    }
}

除此以外,还有两个因素需要模拟,一个是Kafka Topic中有大量待消费的消息,一个是网速慢。前者可以借助kafkacat(github.com/edenhill/kc…)这个工具写入大量测试数据,后者可以使用wondershaper(github.com/magnific0/w…)这个工具限制网卡收发数据的速度。

# 生成大量测试消息,并写入文件
$ for i in $(seq 1 1000000); do echo "test${i}"; done > kafka-test-message.log

# 将文件kafka-test-message.log中的文本按照\n分割成一条条消息,写入名为`test`的Kafka Topic中(有3个Partition)
$ kafkacat -b "192.168.33.10:9092" -P -t test -l -D '\n' kafka-test-message.log
# 安装wondershaper
# 参考 https://unix.stackexchange.com/questions/28198/how-to-limit-network-bandwidth
$ sudo apt-get install wondershaper

$ ifconfig
enp0s3: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
...

# 本机使用enp0s8这块网卡和虚拟机通信
enp0s8: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 192.168.33.10  netmask 255.255.255.0  broadcast 192.168.33.255

# 限制网速,接收和发送速度都降为64kbps
$ sudo wondershaper enp0s8 64 64

接下来运行这个最简单的消费者服务,果然看到了TCP read timeout,而且根本不消费了,故障复现了!

[sarama] 2023/11/21 18:36:57 consumer started
[sarama] 2023/11/21 18:37:13 notification &{map[test:[0 1 2]] map[] map[test:[0 1 2]]}
[sarama] 2023/11/21 18:37:43 error read tcp 192.168.33.1:57410->192.168.33.10:9092: i/o timeout
[sarama] 2023/11/21 18:37:43 error kafka: error while consuming test/1: read tcp 192.168.33.1:57410->192.168.33.10:9092: i/o timeout
[sarama] 2023/11/21 18:37:43 error kafka: error while consuming test/2: read tcp 192.168.33.1:57434->192.168.33.10:9093: i/o timeout
[sarama] 2023/11/21 18:37:43 error kafka: error while consuming test/0: read tcp 192.168.33.1:57435->192.168.33.10:9091: i/o timeout

提出假设

也就是说,应该是在网速较慢,且Consumer.Fetch.Default这个参数过大时,消费者就会因TCP read timeout而停止消费

如果这个结论成立,那么就应该存在一个临界值,Consumer.Fetch.Default小于这个临界值就(大概率)能正常消费,大于了就(大概率)停止消费了。64kbps这里是小b,所以理论上在30秒内最大传输字节数应该是64 / 8 * 1000 * 30

# config.Consumer.Fetch.Default = 3 * 1000 * 30
[sarama] 2023/11/21 19:04:41 consuming: test6402 
[sarama] 2023/11/21 19:04:41 consuming: test6407 
[sarama] 2023/11/21 19:04:41 consuming: test6408 
[sarama] 2023/11/21 19:04:41 consuming: test6413 
[sarama] 2023/11/21 19:04:41 consuming: test6414 
[sarama] 2023/11/21 19:04:42 error kafka: error while consuming test/0: read tcp 192.168.33.1:58020->192.168.33.10:9091: i/o timeout

多次尝试后,可以看到,当config.Consumer.Fetch.Default = 3 * 1000 * 30出现了TCP read timeout报错和正常消费同时出现的情况。不难想象,如果将这个参数设置得更小就更有可能正常消费,反之失败的情况将更多。

其实这里还有一个因素,那就是Kafka Topic中要有足够多的消息。如果只有很少的消息,即使网速又慢,Consumer.Fetch.Default这个参数又过大,也还是能正常消费的,例如,

$ for i in $(seq 1 100); do echo "test${i}"; done > kafka-test-message.log
$ ll
...
-rw-rw-r--  1 vagrant vagrant      692 Nov 21 10:46  kafka-test-message.log

$ go run kafka_debug.go
[sarama] 2023/11/21 18:47:55 consumer started
[sarama] 2023/11/21 18:47:55 notification &{map[test:[0 1 2]] map[] map[test:[0 1 2]]}
[sarama] 2023/11/21 18:48:06 consuming: test2 
[sarama] 2023/11/21 18:48:06 consuming: test6 
[sarama] 2023/11/21 18:48:06 consuming: test8 
[sarama] 2023/11/21 18:48:06 consuming: test10 
[sarama] 2023/11/21 18:48:06 consuming: test16 

通过源代码验证假设

下面我们通过阅读源代码来验证我们的假设——在网速较慢,且Consumer.Fetch.Default这个参数过大时,消费者就会因TCP read timeout而停止消费。

首先,设置read timeout的代码是,

// .../vendor/github.com/Shopify/sarama/config.go

// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
    c := &Config{}

    c.Net.MaxOpenRequests = 5
    c.Net.DialTimeout = 30 * time.Second
    c.Net.ReadTimeout = 30 * time.Second

由此不难找到使用了这个Net.ReadTimeout的部分:

// .../vendor/github.com/Shopify/sarama/broker.go

func (b *Broker) responseReceiver() {
		// ...
    header := make([]byte, 8)
    for response := range b.responses {
				// ...
        err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)) // ①
        if err != nil {
            // ...
        }

        bytesReadHeader, err := io.ReadFull(b.conn, header) // ②
        // ...

        decodedHeader := responseHeader{}
        err = decode(header, &decodedHeader)
        if err != nil {
            // ...
        }
        // ...

        buf := make([]byte, decodedHeader.length-4)
        bytesReadBody, err := io.ReadFull(b.conn, buf)	// ③
        b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
        if err != nil {
            // ...
        }

        response.packets <- buf
    }
    close(b.done)
}

其中核心的部分是

b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)) 	// ①

bytesReadHeader, err := io.ReadFull(b.conn, header) 						// ②
bytesReadBody, err := io.ReadFull(b.conn, buf)									// ③

下面重点分析这几行代码。b.conn的类型是Go内置的类型interface net.Conn,实际类型是struct net.conn(小写c)。该类型的定义如下,

// /usr/local/Cellar/go@1.18/1.18.10/libexec/src/net/net.go
type conn struct {
	fd *netFD
}

SetReadDeadline()的定义如下所示,在该函数中,对struc conn唯一的成员fd调用了同名的函数SetReadDeadline()

// /usr/local/Cellar/go@1.18/1.18.10/libexec/src/net/net.go

// SetReadDeadline implements the Conn SetReadDeadline method.
func (c *conn) SetReadDeadline(t time.Time) error {
	if !c.ok() {
		return syscall.EINVAL
	}
	if err := c.fd.SetReadDeadline(t); err != nil {
		return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
	}
	return nil
}

而从②③两处调用的io.ReadFull()函数的注释可以看出,

// /usr/local/Cellar/go@1.18/1.18.10/libexec/src/io/io.go

// ReadFull reads exactly len(buf) bytes from r into buf.
// It returns the number of bytes copied and an error if fewer bytes were read.
// The error is EOF only if no bytes were read.
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
// On return, n == len(buf) if and only if err == nil.
// If r returns an error having read at least len(buf) bytes, the error is dropped.
func ReadFull(r Reader, buf []byte) (n int, err error) {
	return ReadAtLeast(r, buf, len(buf))
}

func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
	if len(buf) < min {
		return 0, ErrShortBuffer
	}
	for n < min && err == nil {
		var nn int
    nn, err = r.Read(buf[n:]) // net.conn.Read()
		n += nn
	}
  // ...

该函数一旦读不够len(buf)这么多字节,就会返回错误。那这个错误什么时候是TCP read timeout呢,也就是b.conn.SetReadDeadline()io.ReadFull() / ReadAtLeast()是怎么联动起来的呢?

答案就在这里:io.ReadAtLeast()参数中的Reader r的实际类型也是net.conn(c小写,没错该类型既实现了Conn接口又实现了Reader接口),r.Read()实际是对net.conn调用Read(),而net.connRead()定义如下:

// /usr/local/Cellar/go@1.18/1.18.10/libexec/src/net/net.go

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
	if !c.ok() {
		return 0, syscall.EINVAL
	}
	n, err := c.fd.Read(b)
	if err != nil && err != io.EOF {
		err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
	}
	return n, err
}

这里又是对fd这个net.conn中唯一的成员调用了同名函数Read(),即SetReadDeadline()Read()都是对fd调用的,fd在从网络上读取数据时自然就能知道有关读取超时时间的设置了。

而这里的OpError最终会被Sarama转换成ConsumerError,而该类型的Error()返回的就是错误日志中的内容,

func (ce ConsumerError) Error() string {
	return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
}

// 对比错误日志
// [sarama] 2023/11/21 18:37:43 error kafka: error while consuming test/1: read tcp 192.168.33.1:57410->192.168.33.10:9092: i/o timeout

由此可见,只要在conn.SetReadDeadline()设置的读超时(read timeout)时间内,io.ReadFull()没有读完足够多的数据(Kafka消费者的配置项config.Consumer.Fetch.Default指定的数据量),就会报错。其实这部分逻辑与Sarama这个库的关系不大,是Go内置的网络包的逻辑,Sarama只是格式化了错误信息。

另外,如果在这个位置加入断点,

        buf := make([]byte, decodedHeader.length-4)
        bytesReadBody, err := io.ReadFull(b.conn, buf)	// ③

会发现在读取Fetch请求的响应时,buf的大小并不是config.Consumer.Fetch.Default指定的值,大概是该值的80%,其中的原因还需要进一步研究。

结论

后来我们才知道当时是两个机房间的网络带宽出现了异常,难怪ping没有丢包,nctcpdump的输出看起来也正常(类比小河还在流,只是流得慢了),但就是不消费Kafka了。好在之前为了提升跨机房Kafka消费的吞吐量,关注过config.Consumer.Fetch.Default这个选项,并连蒙带猜地调小了这个参数。

通过事后对Go网络包代码的分析,又多了个经验:虽然TCP的数据是流式的,可能在指定的读超时时间内或多或少能读到些数据(不至于白等30秒),但若通过io.ReadFull()读取的话,一旦在指定的读超时时间内没有读到足够多的数据,还是会报错的。不是只有小河断流了才会报错,有时候河水流得慢了也会报错

转载自:https://juejin.cn/post/7324630101873688611
评论
请登录