likes
comments
collection
share

Golang kafka简述和操作(sarama同步异步和消费组)

作者站长头像
站长
· 阅读数 19

一、Kafka简述

1. 为什么需要用到消息队列

  • 异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;

Golang kafka简述和操作(sarama同步异步和消费组)

  • 解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。

Golang kafka简述和操作(sarama同步异步和消费组)

  • 缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。

Golang kafka简述和操作(sarama同步异步和消费组)

2.为什么选择kafka呢?

这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文

kafka的优点:

    1.支持多个生产者和消费者
    2.支持broker的横向拓展
    3.副本集机制,实现数据冗余,保证数据不丢失
    4.通过topic将数据进行分类
    5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量
    6.支持多种模式的消息
    7.基于磁盘实现数据的持久化
    8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟
    9.一个消费者可以支持多种topic的消息
    10.对CPU和内存的消耗比较小
    11.对网络开销也比较小
    12.支持跨数据中心的数据复制
    13.支持镜像集群

kafka的缺点:

    1.由于是批量发送,所以数据达不到真正的实时
    2.对于mqtt协议不支持
    3.不支持物联网传感数据直接接入
    4.只能支持统一分区内消息有序,无法实现全局消息有序
    5.监控不完善,需要安装插件
    6.需要配合zookeeper进行元数据管理
    7.会丢失数据,并且不支持事务
    8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

二、 Golang 操作kafka

1. kafka的环境

网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件

Golang kafka简述和操作(sarama同步异步和消费组)

2. 第三方库

github.com/Shopify/sarama // kafka主要的库*
github.com/bsm/sarama-cluster // kafka消费组 

3. 消费者

  • 单个消费者
func consumer() {
   var wg sync.WaitGroup
   consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"}, nil)
   if err != nil {
      fmt.Println("Failed to start consumer: %s", err)
      return
   }
   partitionList, err := consumer.Partitions("test0") //获得该topic所有的分区
   if err != nil {
      fmt.Println("Failed to get the list of partition:, ", err)
      return
   }

   for partition := range partitionList {
      pc, err := consumer.ConsumePartition("test0", int32(partition), sarama.OffsetNewest)
      if err != nil {
         fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
         return
      }
      wg.Add(1)
      go func(sarama.PartitionConsumer) {  //为每个分区开一个go协程去取值
         for msg := range pc.Messages() { //阻塞直到有值发送过来,然后再继续等待
            fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
         }
         defer pc.AsyncClose()
         wg.Done()
      }(pc)
   }
   wg.Wait()
}
func main() {
   consumer()
}
  • 消费组
func consumerCluster() {
   groupID := "group-1"
   config := cluster.NewConfig()
   config.Group.Return.Notifications = true
   config.Consumer.Offsets.CommitInterval = 1 * time.Second
   config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始从最新的offset开始

   c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901", ","),groupID, strings.Split("test0", ","), config)
   if err != nil {
      glog.Errorf("Failed open consumer: %v", err)
      return
   }
   defer c.Close()
   go func(c *cluster.Consumer) {
      errors := c.Errors()
      noti := c.Notifications()
      for {
         select {
         case err := <-errors:
            glog.Errorln(err)
         case <-noti:
         }
      }
   }(c)

   for msg := range c.Messages() {
      fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
      c.MarkOffset(msg, "") //MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset
   }
}
func main() {
   go consumerCluster()
}

4. 生产者

  • 同步生产者
package main

import (
   "fmt"
   "github.com/Shopify/sarama"
)

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll          //赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。
   config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,默认设置8个分区
   config.Producer.Return.Successes = true
   msg := &sarama.ProducerMessage{}
   msg.Topic = `test0`
   msg.Value = sarama.StringEncoder("Hello World!")
   client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)
   if err != nil {
      fmt.Println("producer close err, ", err)
      return
   }
   defer client.Close()
   pid, offset, err := client.SendMessage(msg)

   if err != nil {
      fmt.Println("send message failed, ", err)
      return
   }
   fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)
}
  • 异步生产者
func asyncProducer() {
   config := sarama.NewConfig()
   config.Producer.Return.Successes = true //必须有这个选项
   config.Producer.Timeout = 5 * time.Second
   p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901", ","), config)
   defer p.Close()
   if err != nil {
      return
   }
   //这个部分一定要写,不然通道会被堵塞
   go func(p sarama.AsyncProducer) {
      errors := p.Errors()
      success := p.Successes()
      for {
         select {
         case err := <-errors:
            if err != nil {
               glog.Errorln(err)
            }
         case <-success:
         }
      }
   }(p)
   for {
      v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
      fmt.Fprintln(os.Stdout, v)
      msg := &sarama.ProducerMessage{
         Topic: topics,
         Value: sarama.ByteEncoder(v),
      }
      p.Input() <- msg
      time.Sleep(time.Second * 1)
   }
}
func main() {
   go asyncProducer()
   select {   
   }
}

5. 结果展示->

同步生产打印:

分区ID:0, offset:90 

消费打印:

Partition:0, Offset:90, key:, value:Hello World!

异步生产打印:

async: 7272
async: 7616
async: 998

消费打印:

Partition:0, Offset:91, key:, value:async: 7272
Partition:0, Offset:92, key:, value:async: 7616
Partition:0, Offset:93, key:, value:async: 998

6. 同步和异步差别

  • 同步模式producer把消息发给kafka之后会等待结果返回。
  • 异步模式producer把消息发给kafka之后不会等待结果返回。