Kafka基本概念和使用API(附图易理解)
一、重要概念
下图以某个 Consumer Group
订阅的某个 topic
(该 topic 有 partition1 和 partition2 两个分区)为例
1.1 Kafka 的高性能、高扩展性和高可用的体现
- 【高性能的体现】一个 topic 分成多个 partition,消费者的消费以 分区 为单位。避免数据的堆积,提升了吞吐量。
- 【高扩展性的体现】将 partition 分散部署到多台机器(即,broker)上,可以通过增加 broker 缓解机器 cpu 过高带来的性能问题。避免多个 partition 部署在同一个机器上,导致单机 cpu 和内存过高,影响系统整体的性能。
- 【高可用的体现】将 partition 分为 leader 和 follower,前者负责生产和消费,后者负责备份前者。而且,将 Leader 和 Follower 分散到不同的 broker 上,即使 Leader 所在的 broker 挂了,也不会影响到 Follower 所在的 broker, 并且还能从 Follower 中选举出一个新的 Leader partition 顶上。此外,将 partition 持久化的磁盘中,即使 broker 都挂了,重启服务后,也能从磁盘里读出数据,继续工作。
1.2 其他组件作用:
-
消费者组:关心某些 topic 的业务方(比如说在创建用户的时候,搜索部门关心新用户, 推荐部门也关心新用户,那么搜索部门和推荐部门分别是两个消费者组)
- 一个 消费者组 可以订阅多个 topic,组内的消费者实例会根据分配策略来消费这些主题下的消息。
- 一个 topic 可以被多个消费者组订阅,且不同消费者组维护自己的消费进度(
offset
情况),互不打搅。
二、入门 API
2.1 安装
- 使用 docker 启动 kafka
kafka:
image: 'bitnami/kafka:3.6.0'
ports:
- '9092:9092'
- '9094:9094'
environment:
- KAFKA_CFG_NODE_ID=0
# 允许自动创建 topic,线上环境不要开启
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://:9093,EXTERNAL://0.0.0.0:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
2.2 使用 Sarama 作为客户端
使用 go install
命令安装如下两个包
2.3 实现 生产者
package sarama
import (
"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"testing"
)
var addr = []string{"localhost:9094"}
// note 在 webook 目录下执行 `kafka-console-consumer -topic=test_topic -brokers=localhost:9094`查看是否发送了消息
func TestSyncProducer(t *testing.T) {
cfg := sarama.NewConfig()
// note 使生产者在成功发送消息后返回成功状态。这有助于测试函数获取到消息是否成功发送的信息
cfg.Producer.Return.Successes = true
// note 可以指定 partitioner
cfg.Producer.Partitioner = sarama.NewRandomPartitioner
//cfg.Producer.Partitioner = sarama.NewRandomPartitioner
//cfg.Producer.Partitioner = sarama.NewHashPartitioner
//cfg.Producer.Partitioner = sarama.NewManualPartitioner
//cfg.Producer.Partitioner = sarama.NewConsistentCRCHashPartitioner
//cfg.Producer.Partitioner = sarama.NewCustomPartitioner()
producer, err := sarama.NewSyncProducer(addr, cfg)
assert.NoError(t, err)
err = producer.SendMessages([]*sarama.ProducerMessage{
{
Topic: "test_topic",
Key: sarama.StringEncoder("这条msg对应的key"),
Value: sarama.StringEncoder("这是一条test_value"),
Headers: []sarama.RecordHeader{
{
Key: []byte("test_key"),
Value: []byte("这是一条test_value"),
},
},
Metadata: "test_metadata",
},
})
assert.NoError(t, err)
}
func TestAsyncProducer(t *testing.T) {
cfg := sarama.NewConfig()
// 为了后面能够拿到发送结果
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
// note 从上到下,性能变差,但是数据可靠性上升
cfg.Producer.RequiredAcks = sarama.NoResponse // 客户端发一次,不需要服务端的确认
cfg.Producer.RequiredAcks = sarama.WaitForLocal // 客户端发送,并且需要服务端写入到主分区
cfg.Producer.RequiredAcks = sarama.WaitForAll // 客户端发送,并且需要服务端同步到所有的 ISR (In Sync Replicas,就是跟上了节奏的从分区)
producer, err := sarama.NewAsyncProducer(addr, cfg)
assert.NoError(t, err)
msgCh := producer.Input()
msgCh <- &sarama.ProducerMessage{
Topic: "test_topic",
Value: sarama.StringEncoder("这是一条test_value"),
Headers: []sarama.RecordHeader{
{
Key: []byte("test_key"),
Value: []byte("这是一条test_value"),
},
},
Metadata: "test_metadata",
}
select {
case err := <-producer.Errors():
t.Log("发送失败,", err.Err, err.Msg)
case msg := <-producer.Successes():
t.Log("发送成功", string(msg.Value.(sarama.StringEncoder)))
}
}
在 docker compose 所在目录下执行 kafka-console-consumer -topic=test_topic -brokers=localhost:9094
查看是否发送了消息。
2.4 实现 消费者
package sarama
import (
"context"
"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
"log"
"testing"
"time"
)
func TestConsumer(t *testing.T) {
cfg := sarama.NewConfig()
group, err := sarama.NewConsumerGroup(addr, "demo", cfg)
assert.NoError(t, err)
// 10s后context过期 or cancel 使得关闭consumer,退出消费
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
start := time.Now()
// note consumer 不关闭,则会一直阻塞在这里
err = group.Consume(ctx, []string{"test_topic"}, ConsumerHandler{})
assert.NoError(t, err)
t.Log(time.Since(start))
}
type ConsumerHandler struct {
}
func (c ConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
// 执行一些初始化的事情
log.Println("这是Handler Setup。。。")
return nil
}
// note 若是人为关闭Consumer,则不会触发Cleanup;若是context超时,则会执行Cleanup
func (c ConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
// 执行一些清理工作
log.Println("这是Handler Cleanup。。。")
return nil
}
func (c ConsumerHandler) ConsumeClaimV1(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
msgs := claim.Messages()
for msg := range msgs {
log.Println("Consumer收到来自producer的消息:", string(msg.Value))
// 提交
session.MarkMessage(msg, "")
}
return nil
}
2.4.1 优化:实现“批量消费,批量提交” ———— ConsumeClaimV2
方法
// 批量消费,批量提交
func (c ConsumerHandler) ConsumeClaimV2(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
msgCh := claim.Messages()
for {
const batchSize = 10
batch := make([]*sarama.ConsumerMessage, 0, batchSize)
for i := 0; i < batchSize; i++ {
msg := <-msgCh
batch = append(batch, msg)
}
// 在这里实现 批量消费和批量提交
log.Println(batch)
for _, msg := range batch {
session.MarkMessage(msg, "")
}
}
}
2.4.2 优化:实现“异步消费,批量提交” ———— ConsumeClaimV3
方法
使用 errgroup.Group
// 异步消费,批量提交
func (c ConsumerHandler) ConsumeClaimV3(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
msgCh := claim.Messages()
for {
const batchSize = 10
batch := make([]*sarama.ConsumerMessage, 0, batchSize)
eg := errgroup.Group{}
for i := 0; i < batchSize; i++ {
msg := <-msgCh
batch = append(batch, msg)
eg.Go(func() error {
log.Println("实现异步消费")
return nil
})
}
err := eg.Wait()
if err != nil {
log.Println("异步消费出现了错误")
}
// 在这里实现 批量提交
for _, msg := range batch {
session.MarkMessage(msg, "")
}
}
}
2.4.3 优化:实现“异步消费,批量提交,且引入超时机制” ———— ConsumeClaim
方法
// note 异步消费、批量提交(引入超时,避免一直阻塞在 <- msgs 中)
func (c ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
msgs := claim.Messages()
const batchSize = 10
for {
batch := make([]*sarama.ConsumerMessage, 0, batchSize) // 构建容量为10的切片
var eg errgroup.Group // 实现异步消费
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
done := false
for i := 0; i < batchSize; i++ {
select {
case <-ctx.Done():
// ctx超时了,避免一直组赛在 msg := <- msgs里
// note 在case中使用break不起作用,所以引入 done 作为标记
done = true
case msg, ok := <-msgs: // note 需要ok,因为channel有可能被关闭
if !ok {
// channel被关闭了
cancel()
return nil
}
batch = append(batch, msg)
// 异步消费
eg.Go(func() error {
// 执行消费业务
log.Println("【并发】Consumer消费来自producer的消息:", string(msg.Value))
// 模拟业务执行所需要的时间
time.Sleep(time.Second * 3)
return nil
})
}
if done {
break
}
}
cancel()
if err := eg.Wait(); err != nil {
log.Println(err)
continue
}
// note 批量提交
for _, msg := range batch {
session.MarkMessage(msg, "")
}
}
}
转载自:https://juejin.cn/post/7389952004792483840