一起来学kafka之Kafka集群搭建
前言
目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka的一些核心概念以及如何利用docker快速的搭建Kafka集群~
好了, 废话不多说直接开整吧~
什么是 Kafka
Kafka是一种高吞吐量、分布式、可扩展的消息中间件系统,最初由LinkedIn公司开发。随着不断的发展,在最新的版本中它定义为分布式的流处理平台,现在在大数据应用中也是十分广泛。
它可以处理大量的实时数据流,被广泛应用于日志收集、事件处理、流处理、消息队列等场景。
Kafka的架构包含producer(生产者)、consumer(消费者)、broker(代理服务器)等组件。生产者可以将消息发送到Kafka集群,消费者可以从Kafka集群订阅消息并进行处理,而broker则是消息的中转服务器,负责存储和转发消息。
Kafka的特点包括:
-
高吞吐量:Kafka可以处理海量的数据流,支持每秒百万级别的消息处理。 -
可扩展性:Kafka的集群可以根据需要进行水平扩展,从而提高系统的性能和容量。 -
可靠性:Kafka支持多副本机制,可以保证数据的可靠性和高可用性。 -
灵活性:Kafka支持多种消息格式和协议,可以与各种系统和工具进行集成。
Kafka是一个开源的项目,已经成为了Apache软件基金会的顶级项目.
Kafka & 核心概念
接着,我们看下它的核心概念,这些概念都很重要,在后边的学习中都会遇到,概念一定要搞明白,对于理解Kafka的工作原理和使用方法非常重要。不然学习起来比较懵, 下面一起看一下核心概念:
Topic
Topic是消息的逻辑容器,用于对消息进行分类和存储。在Kafka中,消息会被发布到指定的topic中,并且可以被一个或多个消费者订阅。Topic是Kafka的核心概念之一,是实现消息传递的基础。
Producer
Producer是消息的生产者,用于向指定的topic中发送消息。Producer负责将消息发送到Kafka集群中的broker节点,并且可以在发送消息时指定消息的key,以便Kafka将消息分配到指定的partition中。
Consumer
Consumer是消息的消费者,用于从指定的topic中接收消息。Consumer负责从Kafka集群中的broker节点获取消息,并且可以指定从哪个partition中获取消息。消费者可以以不同的方式进行消息消费,例如批量消费、轮询消费等。
Broker
Broker是Kafka集群中的一个节点,用于存储和管理消息。Broker是Kafka的核心组件之一,负责接收和处理生产者发送的消息,并将其存储到磁盘中,同时还负责将消息转发给消费者。
Partition
Partition是Kafka中实现数据分片的机制,一个topic可以被分成多个partition,每个partition都是一个有序的消息队列。消息在被发送到一个topic时,会被根据指定的key进行hash计算,然后被分配到对应的partition中。
Offset
Offset是Kafka中的一个重要概念,用于标识每个消息在一个partition中的位置。每个partition都有一个唯一的offset值,消费者可以根据offset来获取指定位置的消息。Kafka还提供了一种特殊的topic,称为__consumer_offsets,用于存储消费者消费的位置信息。
Kafka & 主要架构
如图:
+---------+ +---------+
|Producer | |Consumer |
+---------+ +---------+
| |
| |
+---------+ +---------+
| Broker | | Broker |
+---------+ +---------+
/ \ / \
/ \ / \
+---------+ +---------+ +---------+ +---------+
|Partition| |Partition| |Partition| |Partition|
+---------+ +---------+ +---------+ +---------+
/ \ / \ / \ / \
/ \ / \ / \ / \
+----------+ +----------+ +----------+ +----------+ +----------+
|Replica | |Replica | |Replica | |Replica | |Replica |
+----------+ +----------+ +----------+ +----------+ +----------+
Leader Follower Follower Follower Follower
| | | |
| | | |
Write Read Read Read
| | | |
+--------+----------+------------+-----------+
| |
| |
+---------+ +---------+
| Disk | | Memory |
+---------+ +---------+
在这个流程图中,主要有以下几个流程:
-
Producer将消息发送到Broker节点,Broker将消息存储到对应的Partition中。 -
每个
Partition可以有多个Replica,其中一个Replica被选为Leader,其余Replica为Follower。 -
Leader负责处理消息的写操作,将消息追加到Partition中。 -
Follower负责与Leader保持同步,定期从Leader中拉取消息并复制到本地副本中,以保证数据的一致性。 -
Consumer从Broker中读取消息,可以指定消费某个Topic中的指定Partition中的消息,也可以进行批量消费或实时消费。 -
Broker将消息存储在磁盘中,同时也会缓存部分消息到内存中,以提高读写性能。
为了提高集群的可用性和稳定性, 架构中还会引入ZooKeeper, ZooKeeper用于维护Kafka集群中的Broker节点信息、Partition信息、Topic信息等。
Leader & Follower
上述提到了leader和Follower,有的小伙伴可能不知道是啥,这里讲下为啥会有这个?
在Kafka的分布式架构中,每个Partition可以有多个Replica,其中一个Replica被选为Leader,其余Replica为Follower。
Leader是指在一个Partition中,负责处理该Partition所有消息的读写操作的Replica。当Producer发送消息到该Partition时,消息会首先被发送到Leader所在的Replica,Leader再将消息追加到Partition中,然后将消息复制到所有Follower的Replica中。在读取数据时,Consumer也只能从Leader所在的Replica中读取消息,而Follower只负责与Leader保持同步,不参与读写操作。
Leader的选举方式与ZooKeeper密切相关,当Leader所在的Replica出现故障时,ZooKeeper会自动选举新的Leader,以保证Partition中的数据一致性和可用性。由于只有Leader负责读写操作,因此可以有效避免数据的冲突和重复`。
ZooKeeper
如果有小伙伴不知道ZooKeeper是啥,给大家简要介绍一下:
ZooKeeper是一个分布式协调服务,常用于分布式系统中的协调与通知。Kafka使用ZooKeeper来进行集群管理、Leader选举、存储Metadata信息等。ZooKeeper是Kafka的重要组成部分,没有ZooKeeper的支持,Kafka集群无法正常运行,往后的发展趋势可能会不用强依赖ZooKeeper`。
在Kafka中,每个Broker都会向ZooKeeper注册自己的节点信息,包括Broker ID、IP地址和端口号等。同时,每个Partition的Metadata信息也会存储在ZooKeeper中,包括该Partition的Replica信息、Leader信息、ISR信息等。当Broker加入或退出集群时,ZooKeeper会自动通知其他Broker更新集群的状态信息。在Leader选举时,ZooKeeper会根据预设的算法选举出新的Leader,并通知其他Broker更新Partition`的状态信息。
除了Kafka之外,ZooKeeper还被广泛应用于Hadoop、HBase、Solr等其他分布式系统中,是一个非常成熟和稳定的分布式协调服务。再举一个,Dubbo RPC服务开发框架,如果有用过Dubbo的小伙伴,ZooKeeper一定不会陌生。
这块知识点,大家一定要搞懂,也是面试的热点问题~
Kafka & 集群搭建
这里教大家如何使用docker部署Kafka集群,需要大家安装好docker, 如果不会安装的可以参考之前的文章es集群搭建。
给大家提前准备好了docker-compose.yml文件,配置有点多,如果没有一些docker基础,可能会看不懂,不过没关系,不影响我们的部署,
直接执行docker-compose up -d就完了。因为安装的东西比较多,包含zookeeper集群,kafka集群,kafka-ui管理后台,这个ui后台是一个开源的系统,界面比较整洁,推荐给大家,命令执行后稍稍等待一会~
version: '3.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
volumes:
- ./data/zookeeper/zoo1/data:/data
- ./data/zookeeper/zoo1/datalog:/datalog
zoo2:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo2
container_name: zoo2
ports:
- "2182:2182"
environment:
ZOOKEEPER_CLIENT_PORT: 2182
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
volumes:
- ./data/zookeeper/zoo2/data:/data
- ./data/zookeeper/zoo2/datalog:/datalog
zoo3:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo3
container_name: zoo3
ports:
- "2183:2183"
environment:
ZOOKEEPER_CLIENT_PORT: 2183
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
volumes:
- ./data/zookeeper/zoo3/data:/data
- ./data/zookeeper/zoo3/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
volumes:
- ./data/kafka_data1:/kafka/data
depends_on:
- zoo1
- zoo2
- zoo3
kafka2:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka2
container_name: kafka2
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
volumes:
- ./data/kafka_data2:/kafka/data
depends_on:
- zoo1
- zoo2
- zoo3
kafka3:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka3
container_name: kafka3
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
volumes:
- ./data/kafka_data3:/kafka/data
depends_on:
- zoo1
- zoo2
- zoo3
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 9999:8080
depends_on:
- kafka1
- kafka2
- kafka3
environment:
KAFKA_CLUSTERS_0_NAME: k1
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_1_NAME: k2
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:29093
KAFKA_CLUSTERS_2_NAME: k3
KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: kafka3:29094
然后浏览器打开localhost:9999可以访问UI后台, 我们可以通过后台新建topic来验证集群是否工作。


这里再给大家推荐一个GUI工具,Kafka Assistant这个工具方便我们日常开发测试使用,界面也很简洁,直接桌面端安装,利用它发送几条消息。

然后我们到后台页面,观察三个节点,发现topic里都有test,并且消息都是存在的

结束语
本节到这里就结束, 概念有点多,需要好好理理,后边会结合实际案例给大家继续讲它的概念和工作原理。下节带大家看下Springboot整合Kafka实战
本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~
ElasticSearch 专题学习
项目源码(源码已更新 欢迎star⭐️)
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码分析)
- 面试官: Stream并行流有了解过吗?说说看
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
博客(阅读体验较佳)
转载自:https://juejin.cn/post/7208048755123601466