likes
comments
collection
share

怎么使用Kafka?收藏这篇短文就可以了

作者站长头像
站长
· 阅读数 9

〇、前言

便于大家对本章内容的理解,我重新整理了一下Kafka中的部分重要概念,以表格的方式呈现出来,请见下表所示:

名词解释
Broker 节点一个Kafka节点就是一个Broker,一个和多个Broker可以组成一个Kafka集群
Topic 主题Kafka根据topic对消息进行归类,发布到kafka集群的每套消息都需要指定一个topic,topic是一个逻辑概念,物理上是不存在的
Producer 生产者用于向Kafka中发送消息
Consumer 消费者从Kafka中获取消息
Consumer Group 消费组每个Consumer都会归属于一个消费组,一条消息可以同时被多个不同的消费组消费,但是只能被一个消费组中的消费者消费
Partition 分片物理上的概念,可以将一个topic上的数据拆分为多分放到Partition中,每个Patition内部的消息是有序的。

本篇文章的主要目的就是操作一下Kafka,从直观感受上面使用一下它,而不是让它仅仅存在于我们理论和想象中的认知上。

那么对于这种中间件的操作,我们一般来说普遍会采用两种方式:

方式1】通过bin路径下的脚本指令,在控制台端进行使用操作; 【方式2】通过对jar包的引用,在代码层面上进行使用操作;

在下面章节中,我们就分别针对控制台层面操作代码层面操作这两个方面,对Kafka进行第一次亲密的接触。

一、控制台层面操作

对于Kafka支持多少控制台指令,在其官网(https://kafka.apache.org/documentation/#quickstart)中就已经详细的列举出来了,我们可以很方面的从官网中获得对某个指令的解释和使用说明,如下所示:

怎么使用Kafka?收藏这篇短文就可以了

同样,在我们的安装了Kafka的bin目录下,也存在着对应这些指令的sh脚本文件, 也是它构成了我们可以非常方便的在控制台这一层面操作Kafka的可能性,如下所示:

怎么使用Kafka?收藏这篇短文就可以了

虽然指令和脚本文件挺多的,但是我们没有必要从头学一遍,毕竟是第一次操作Kafka,我们只做3件事:创建Topic、通过Producer发送消息和通过Consumer接受消息。

1.1> 创建/查看主题(kafka-topics.sh)

当我们发送消息的时候,主题topic是我们重要的参数之一,我们可以针对不同业务创建不同的topic,从而达到对消息的隔离性。在bin目录下,负责管理Topic的脚本是kafka-topics.sh,我们可以通过对它操作来实现topic的管理。下面,我们就来尝试创建一个名称为“muse”的Topic。

kafka_2.13-3.0.0> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse --partitions 1 --replication-factor 1

Created topic muse.

--bootstrap-server 】待链接到的Kafka服务地址,此处我们指定localhost:9092; 【 --create 】执行创建Topic主题指令; 【 --topic 】指定待创建的主题名称,此处我们指定创建名称为“muse”的topic; 【 --partitions 】指定分区个数,由于我们采用单机模式,即只有1个Broker,所以指定创建1个分区; 【 --replication-factor 】指定创建副本的个数,此处我们指定创建1个副本,即主副本;

创建完主题Topic之后,我们也可以通过 --list指令 查看Kafka下所有主题列表,如下所示:

kafka_2.13-3.0.0> bin/kafka-topics.sh --list --bootstrap-server localhost:9092                

muse

同样的,我们也可以通过 --describe指令 查看刚刚创建的那个名称为“muse”的Topic内的具体描述信息:

kafka_2.13-3.0.0> bin/kafka-topics.sh --describe --topic muse --bootstrap-server localhost:9092                            
Topic: muse	TopicId: iDQpnERjSI2IvaB2kaB2aQ	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
Topic: muse	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

1.2> 生产端(kafka-console-producer.sh)

在上面,我们已经创建好了名称为“muse”的主题Topic了,那么我们就可以尝试向这个Topic发送消息了。此时,我们可以通过使用kafka-console-producer.sh来发送消息,它可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行都会被当做一个独立的消息。具体发送语句如下所示:

kafka_2.13-3.0.0> bin/kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092                
>message1
>message2
>

其中,通过使用--bootstrap-server来指定Kafka服务地址;如果配置了Kafka集群,用逗号分割即可。

1.3> 消费端(kafka-console-consumer.sh)

上面我们虽然向Kafka中发送了两条消息——message1message2,但是由于此时并没有任何消费者Consumer,所以这两个消息也无法被读取。那么,我们可以利用kafka-console-consumer.sh来执行消息消费操作,具体消费指令如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092                

我们发现执行了上面的指令,控制台没有输入任何内容,那么,我们切换到Producer段,再发送两条消息——message3message4,我们发现,此时Consumer端有消息输出了,如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092                

message3
message4

发生上面情况的原因就是,在默认情况下,消费者是从最后一条消息的偏移量+1开始消费,即:Consumer客户端启动之前的消息是不会被消费的。那如果我们想要把Consumer客户端启动之前的消息也获取到,则可以添加--from-beginning参数即可,如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --from-beginning

message1
message2
message3
message4

二、代码层面操作

项目中引入kafka-clients的依赖(也可以直接引入spring-kafka的依赖,里面内嵌了kafka-clients)

怎么使用Kafka?收藏这篇短文就可以了

2.1> 编写生产者端

2.1.1> 初始化配置

创建配置对象Properties

Properties properties = new Properties();

配置kafka的Broker列表

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");

发出消息持久化机制参数

properties.put(ProducerConfig.ACKS_CONFIG, "1");

ACKS_CONFIG的类型有如下3种:

【acks=0】表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息。 【acks=1】表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。 【acks=-1/all】需要等待所有min.insync.replicas(默认为1,推荐配置>=2)这个参数配置的副本个数都成功写入日志。这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。

配置失败重试机制

properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次 properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms

配置缓存相关信息

Producer的消息会先发送到本地缓冲区(BUFFER_MEMORY_CONFIG),而不是发送一次消息连接一次kafka。 kafka本地线程会从缓冲区去取数据(BATCH_SIZE_CONFIG),然后批量发送到Broker,即:一个批次满足16KB就会发送出去。 LINGER_MS_CONFIG的默认值为0,表示消息必须立即被发送,但这样会影响性能。 设置10ms也就是说Producer消息发送完后会进入本地的batch中;如果10ms内,这个batch满足了16KB,那么就会随着batch一起被发送出去。如果10ms内,batch没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长。

配置key和value的序列化实现类

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

2.1.2> 同步消息发送

怎么使用Kafka?收藏这篇短文就可以了

同步消息发送代码如下所示

怎么使用Kafka?收藏这篇短文就可以了

2.1.3> 异步消息发送

怎么使用Kafka?收藏这篇短文就可以了

2.2> 编写消费者端

2.2.1> 初始化配置

创建配置对象Properties

Properties properties = new Properties();

配置kafka的Broker列表

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");

配置消费组

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "museGroup");

offset的重置策略

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 【解释】offset的重置策略——例如:创建一个新的消费组,offset是不存在的,如何对offset赋值消费。 latest:默认值,只消费自己启动之后发送到主题的消息。 earliest:第一次从头开始消费,以后按照消费offset记录继续消费。

心跳相关配置

/** Consumer给Broker发送心跳的时间间隔 */
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

/** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);

poll相关配置

/** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

/** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);

配置key和value的反序列化实现类

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

2.2.2> 自动提交offset

怎么使用Kafka?收藏这篇短文就可以了

自动提交offset

当消费者向Broker的log中poll到消息后,默认情况下,会向broker中名称为“__consumer_offsets”的Topic发送offset偏移量。 自动提交会出现丢失消息的情况。 因为如果Consumer还没消费完poll下来的消息就自动提交了偏移量,那么此时如果Consumer挂掉了,那么下一个消费者会从已经提交的offset的下一个位置开始消费消息。那么之前没有被消费的消息就丢失了。

怎么使用Kafka?收藏这篇短文就可以了

2.2.3> 手动提交offset

手动提交offset

当消费者从kafka的Broker日志文件中poll到消息并且消费完毕之后。再手动提交当前的offset。

怎么使用Kafka?收藏这篇短文就可以了

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」