likes
comments
collection
share

Kafka基本概念和使用API(附图易理解)

作者站长头像
站长
· 阅读数 44

一、重要概念

下图以某个 Consumer Group 订阅的某个 topic (该 topic 有 partition1 和 partition2 两个分区)为例

Kafka基本概念和使用API(附图易理解)

1.1 Kafka 的高性能、高扩展性和高可用的体现

  • 高性能的体现】一个 topic 分成多个 partition,消费者的消费以 分区 为单位。避免数据的堆积,提升了吞吐量。
  • 高扩展性的体现】将 partition 分散部署到多台机器(即,broker)上,可以通过增加 broker 缓解机器 cpu 过高带来的性能问题。避免多个 partition 部署在同一个机器上,导致单机 cpu 和内存过高,影响系统整体的性能。
  • 高可用的体现】将 partition 分为 leader 和 follower,前者负责生产和消费,后者负责备份前者。而且,将 Leader 和 Follower 分散到不同的 broker 上,即使 Leader 所在的 broker 挂了,也不会影响到 Follower 所在的 broker, 并且还能从 Follower 中选举出一个新的 Leader partition 顶上。此外,将 partition 持久化的磁盘中,即使 broker 都挂了,重启服务后,也能从磁盘里读出数据,继续工作。

1.2 其他组件作用:

  • 消费者组:关心某些 topic 的业务方(比如说在创建用户的时候,搜索部门关心新用户, 推荐部门也关心新用户,那么搜索部门和推荐部门分别是两个消费者组)

    • 一个 消费者组 可以订阅多个 topic,组内的消费者实例会根据分配策略来消费这些主题下的消息。
    • 一个 topic 可以被多个消费者组订阅,且不同消费者组维护自己的消费进度(offset情况),互不打搅。

Kafka基本概念和使用API(附图易理解)

二、入门 API

2.1 安装

  1. 使用 docker 启动 kafka
kafka:
  image: 'bitnami/kafka:3.6.0'
  ports:
    - '9092:9092'
    - '9094:9094'
  environment:
    - KAFKA_CFG_NODE_ID=0
    # 允许自动创建 topic,线上环境不要开启
    - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://:9093,EXTERNAL://0.0.0.0:9094
    - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

2.2 使用 Sarama 作为客户端

使用 go install 命令安装如下两个包

Kafka基本概念和使用API(附图易理解)

2.3 实现 生产者

package sarama

import (
    "github.com/IBM/sarama"
    "github.com/stretchr/testify/assert"
    "testing"
)

var addr = []string{"localhost:9094"}

// note 在 webook 目录下执行 `kafka-console-consumer -topic=test_topic -brokers=localhost:9094`查看是否发送了消息
func TestSyncProducer(t *testing.T) {
    cfg := sarama.NewConfig()
    // note 使生产者在成功发送消息后返回成功状态。这有助于测试函数获取到消息是否成功发送的信息
    cfg.Producer.Return.Successes = true
    // note 可以指定 partitioner
    cfg.Producer.Partitioner = sarama.NewRandomPartitioner
    //cfg.Producer.Partitioner = sarama.NewRandomPartitioner
    //cfg.Producer.Partitioner = sarama.NewHashPartitioner
    //cfg.Producer.Partitioner = sarama.NewManualPartitioner
    //cfg.Producer.Partitioner = sarama.NewConsistentCRCHashPartitioner
    //cfg.Producer.Partitioner = sarama.NewCustomPartitioner()

    producer, err := sarama.NewSyncProducer(addr, cfg)

    assert.NoError(t, err)
    err = producer.SendMessages([]*sarama.ProducerMessage{
       {
          Topic: "test_topic",
          Key:   sarama.StringEncoder("这条msg对应的key"),
          Value: sarama.StringEncoder("这是一条test_value"),
          Headers: []sarama.RecordHeader{
             {
                Key:   []byte("test_key"),
                Value: []byte("这是一条test_value"),
             },
          },
          Metadata: "test_metadata",
       },
    })
    assert.NoError(t, err)
}

func TestAsyncProducer(t *testing.T) {
    cfg := sarama.NewConfig()
    // 为了后面能够拿到发送结果
    cfg.Producer.Return.Successes = true
    cfg.Producer.Return.Errors = true

    // note 从上到下,性能变差,但是数据可靠性上升
    cfg.Producer.RequiredAcks = sarama.NoResponse   // 客户端发一次,不需要服务端的确认
    cfg.Producer.RequiredAcks = sarama.WaitForLocal // 客户端发送,并且需要服务端写入到主分区
    cfg.Producer.RequiredAcks = sarama.WaitForAll   // 客户端发送,并且需要服务端同步到所有的 ISR (In Sync Replicas,就是跟上了节奏的从分区)

    producer, err := sarama.NewAsyncProducer(addr, cfg)
    assert.NoError(t, err)

    msgCh := producer.Input()
    msgCh <- &sarama.ProducerMessage{
       Topic: "test_topic",
       Value: sarama.StringEncoder("这是一条test_value"),
       Headers: []sarama.RecordHeader{
          {
             Key:   []byte("test_key"),
             Value: []byte("这是一条test_value"),
          },
       },
       Metadata: "test_metadata",
    }

    select {
    case err := <-producer.Errors():
       t.Log("发送失败,", err.Err, err.Msg)
    case msg := <-producer.Successes():
       t.Log("发送成功", string(msg.Value.(sarama.StringEncoder)))
    }
}

在 docker compose 所在目录下执行 kafka-console-consumer -topic=test_topic -brokers=localhost:9094查看是否发送了消息。

Kafka基本概念和使用API(附图易理解)

2.4 实现 消费者

package sarama

import (
    "context"
    "github.com/IBM/sarama"
    "github.com/stretchr/testify/assert"
    "golang.org/x/sync/errgroup"
    "log"
    "testing"
    "time"
)

func TestConsumer(t *testing.T) {
    cfg := sarama.NewConfig()
    group, err := sarama.NewConsumerGroup(addr, "demo", cfg)
    assert.NoError(t, err)

    // 10s后context过期 or cancel 使得关闭consumer,退出消费
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
    defer cancel()

    start := time.Now()
    // note consumer 不关闭,则会一直阻塞在这里
    err = group.Consume(ctx, []string{"test_topic"}, ConsumerHandler{})
    assert.NoError(t, err)
    t.Log(time.Since(start))
}

type ConsumerHandler struct {
}

func (c ConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
    // 执行一些初始化的事情
    log.Println("这是Handler Setup。。。")
    return nil
}

// note 若是人为关闭Consumer,则不会触发Cleanup;若是context超时,则会执行Cleanup
func (c ConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
    // 执行一些清理工作
    log.Println("这是Handler Cleanup。。。")
    return nil
}

func (c ConsumerHandler) ConsumeClaimV1(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    msgs := claim.Messages()
    for msg := range msgs {
       log.Println("Consumer收到来自producer的消息:", string(msg.Value))
       // 提交
       session.MarkMessage(msg, "")
    }
    return nil
}

2.4.1 优化:实现“批量消费,批量提交” ———— ConsumeClaimV2方法

// 批量消费,批量提交
func (c ConsumerHandler) ConsumeClaimV2(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    msgCh := claim.Messages()
    for {
       const batchSize = 10
       batch := make([]*sarama.ConsumerMessage, 0, batchSize)
       for i := 0; i < batchSize; i++ {
          msg := <-msgCh
          batch = append(batch, msg)
       }
       
       // 在这里实现 批量消费和批量提交
       log.Println(batch)
       for _, msg := range batch {
          session.MarkMessage(msg, "")
       }
    }
}

2.4.2 优化:实现“异步消费,批量提交” ———— ConsumeClaimV3方法

使用 errgroup.Group

// 异步消费,批量提交
func (c ConsumerHandler) ConsumeClaimV3(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    msgCh := claim.Messages()
    for {
       const batchSize = 10
       batch := make([]*sarama.ConsumerMessage, 0, batchSize)
       eg := errgroup.Group{}
       for i := 0; i < batchSize; i++ {
          msg := <-msgCh
          batch = append(batch, msg)
          eg.Go(func() error {
             log.Println("实现异步消费")
             return nil
          })
       }

       err := eg.Wait()
       if err != nil {
          log.Println("异步消费出现了错误")
       }
       
       // 在这里实现 批量提交
       for _, msg := range batch {
          session.MarkMessage(msg, "")
       }
    }
}

2.4.3 优化:实现“异步消费,批量提交,且引入超时机制” ———— ConsumeClaim方法

// note 异步消费、批量提交(引入超时,避免一直阻塞在 <- msgs 中)
func (c ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    msgs := claim.Messages()
    const batchSize = 10
    for {
       batch := make([]*sarama.ConsumerMessage, 0, batchSize) // 构建容量为10的切片
       var eg errgroup.Group                                  // 实现异步消费
       ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
       done := false
       for i := 0; i < batchSize; i++ {
          select {
          case <-ctx.Done():
             // ctx超时了,避免一直组赛在 msg := <- msgs里
             // note 在case中使用break不起作用,所以引入 done 作为标记
             done = true
          case msg, ok := <-msgs: // note 需要ok,因为channel有可能被关闭
             if !ok {
                // channel被关闭了
                cancel()
                return nil
             }
             batch = append(batch, msg)
             // 异步消费

            eg.Go(func() error {
                // 执行消费业务
                log.Println("【并发】Consumer消费来自producer的消息:", string(msg.Value))
                // 模拟业务执行所需要的时间
                time.Sleep(time.Second * 3)
                return nil
            })
          }
          if done {
             break
          }
       }
       cancel()
       if err := eg.Wait(); err != nil {
          log.Println(err)
          continue
       }
       // note 批量提交
       for _, msg := range batch {
          session.MarkMessage(msg, "")
       }
    }
}
转载自:https://juejin.cn/post/7389952004792483840
评论
请登录