Kafka源码解读-Consumer重平衡
前言
本文是基于Kafka 2.7版本,参考《Apache Kafka源码剖析》这本书做的学习笔记
什么情况下会触发Rebalance操作
- 有新的消费者加入Consumer Group
- 有消费者主动退出Consumer Group
- 消费者宕机下线。例如遇到长时间的GC、网络延迟导致消费者长时间未向Group Coordinator发送HeartbeatRequest时,Group Coordinator会认为消费者下线,从而触发rebalance操作
- Consumer Group订阅的任一Topic出现分区数量的变化
- 消费者调用unsubscribe()取消对某Topic的订阅
这是《Apache Kafka源码剖析》上的描述,随着Kafka版本的迭代,这个表述已经不准确了,从kafka2.3版本起,消费者有静态成员的选项,可以避免consumer重启导致的两次rebalance问题,这就解决了第三个问题,当然,在新的版本中,什么情况下才会引起rebalance,我暂时也没完全搞明白,www.bilibili.com/read/cv1164… 这篇博客写得很好,值得再三阅读
什么是Group Coordinator
Coordinator是协调者的意思,每个Consumer Group都会选择一个broker作为自己的Group Coordinator,由Group Coordinator监控所有consumer的心跳,从而管理对应Consumer Group的rebalance,也就是说,这个角色是为协调rebalance而生
Rebalance过程
- 当前consumer准备加入Consumer Group或Group Coordinator发生故障转移时,consumer并不知道Group Coordinator的host和port,所以consumer会向Kafka集群中负载最小的Node节点发送FindCoordinatorRequest请求,收到请求的broker节点会返回FindCoordinatorResponse,其中就包含了负责管理该Consumer Group的Group Coordinator的地址
- 成功找到Consumer Group对应的Group Coordinator后,consumer会向Group Coordinator发送JoinGroupRequest请求,其中包含consumer的相关信息,Group Coordinator收到JoinGroupRequest后会暂存该consumer信息,等待全部consumer的JoinGroupRequest请求,JoinGroupRequest中的sessionTimeoutMs和rebalanceTimeoutMs就是用来告诉Group Coordinator等待多久的
- Group Coordinator会根据全部consumer的JoinGroupRequest请求来确定Consumer Group中可用的consumer,从中选取一个consumer成为leader,同时还会把分区策略和其他consumer的订阅信息发送给leader,每个consumer都会收到JoinGroupResponse响应,但是只有leader收到的JoinGroupResponse响应中封装了所有consumer信息以及leader信息,当其中一个consumer确定了自己的leader地位后,会根据consumer信息、Kafka集群元数据以及partition分配策略计算partition的分片结果,其它非leader consumer收到JoinGroupResponse为空响应,也就不会进行任何操作,只是原地等待
- 接下来,所有consumer进入Synchronizing Group State阶段,所有consumer会向Group Coordinator发送SyncGroupRequest,其中,leader consumer的SyncGroupRequest请求中包含了partition分配结果,普通consumer的SyncGroupRequest为空请求
- Group Coordinator会将partition分配结果封装成SyncGroupResponse返回给所有consumer
- consumer收到SyncGroupResponse后进行解析,就可以明确partition与consumer的映射关系
- 后续consumer会与Group Coordinator保持定期的心跳
如下图
ConsumerNetworkClient
ConsumerNetworkClient在NetworkClient之上进行了封装,主要负责与broker的网络交互
一些重要的成员变量
// ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>>
// 请求的缓冲队列,key是Node节点,value是发往此Node的ClientRequest队列
private final UnsentRequests unsent = new UnsentRequests();
// Kafka集群元数据
private final Metadata metadata;
// 重试退避时间
private final long retryBackoffMs;
// 超时时间,默认5000ms
private final int maxPollTimeoutMs;
// 请求超时时间,默认30000ms
private final int requestTimeoutMs;
// We do not need high throughput, so use a fair lock to try to avoid starvation
private final ReentrantLock lock = new ReentrantLock(true);
// 存储请求的的回调对象,当请求完成时,会将回调对象放入此队列,等待统一回调
private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>();
一些重要的方法
unsent存储的是要发送但是还没有执行的网络请求,我们来看看它是在什么节点放进去的
public RequestFuture<ClientResponse> send(Node node,
AbstractRequest.Builder<?> requestBuilder,
int requestTimeoutMs) {
long now = time.milliseconds();
// 初始化回调对象
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
// 构造请求对象
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
requestTimeoutMs, completionHandler);
// 把请求放进缓存队列中
unsent.put(node, clientRequest);
// wakeup the client in case it is blocking in poll so that we can send the queued request
// 唤醒请求客户端
client.wakeup();
return completionHandler.future;
}
ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll()方法有多个重载,最终会调用poll(Timer timer, PollCondition pollCondition, boolean disableWakeup),timer用于判断网络请求的阻塞时间,pollCondition用于判断是否进行阻塞
public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) {
// 处理回调方法
firePendingCompletedRequests();
lock.lock();
try {
handlePendingDisconnects();
// 尝试发送可以发送的请求
long pollDelayMs = trySend(timer.currentTimeMs());
// 根据pollCondition条件判断是否需要阻塞直至请求成功
// 比如消费者拉取消息时发现缓存中已经没有缓存的消息,这时候就需要阻塞发送到服务端的请求知道请求成功
if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {
long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
if (client.inFlightRequestCount() == 0)
pollTimeout = Math.min(pollTimeout, retryBackoffMs);
client.poll(pollTimeout, timer.currentTimeMs());
} else {
client.poll(0, timer.currentTimeMs());
}
timer.update();
......
} finally {
lock.unlock();
}
// called without the lock to avoid deadlock potential if handlers need to acquire locks
firePendingCompletedRequests();
metadata.maybeThrowAnyException();
}
ConsumerNetworkClient.trySend()方法循环处理unsent中缓存的请求,具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready()方法检测consumer与此节点之间的连接,若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队列中等待响应
long trySend(long now) {
long pollDelayMs = maxPollTimeoutMs;
// send any requests that can be sent now
// 按节点从unsent集合中获取ClientRequest
for (Node node : unsent.nodes()) {
Iterator<ClientRequest> iterator = unsent.requestIterator(node);
if (iterator.hasNext())
// 计算超时时间,这个超时时间会作为Selector最长阻塞时间
pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
// 调用NetworkClient.ready()检测是否可以发送请求
if (client.ready(node, now)) {
// 调用NetworkClient.send(),等待发送请求
client.send(request, now);
// 删除队列中对应请求
iterator.remove();
} else {
break;
}
}
}
// 返回超时时间
return pollDelayMs;
}
RequestFutureAdapter
ConsumerNetworkClient负责最底层的网络发送,那么这些网络请求的Request跟Response是怎么处理的呢?不得不说RequestFutureAdapter这个类
RequestFutureAdapter是适配器的抽象类,定义了onSuccess()和onFailure(),如上图,它的实现类有HeartbeatResponseHandler、JoinGroupResponseHandler、LeaveGroupResponseHandler、SyncGroupResponseHandler等,看名字就知道是用来处理FindCoordinatorRequest、JoinGroupRequest等请求的,下面我们看下FindCoordinatorResponseHandler的代码来感受一下查找Group Coordinator的Response是怎么处理的
private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Received FindCoordinator response {}", resp);
// 请求Group Coordinator节点成功,服务端的回应
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
Errors error = findCoordinatorResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
// 使用Integer.MAX_VALUE - node.id作为协调器id以允许单独的连接
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
// 构建Group Coordinator节点的Node对象
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());
log.info("Discovered group coordinator {}", coordinator);
// 尝试与Group Coordinator节点建立连接
client.tryConnect(coordinator);
heartbeat.resetSessionTimeout();
}
// 响应传播
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
future.raise(error);
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
......
}
}
那为什么要定义这个RequestFutureAdapter呢?
定义某个规范,比如抽象类、接口,一般都是因为类似的场景很多,定义规范便于管理,上文讲过,consumer与broker的交互是很多的,每次交互都有response,不同的response处理方式是不同的,通过继承RequestFutureAdapter可以做到每个实现类单独处理某个response,减少大量的if else,那这个RequestFutureAdapter是怎么使用的呢
断点在FindCoordinatorResponseHandler类上,找到整条调用链
调用链如下
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#poll
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#firePendingCompletedRequests
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler#fireCompletion
org.apache.kafka.clients.consumer.internals.RequestFuture#complete
org.apache.kafka.clients.consumer.internals.RequestFuture#fireSuccess
org.apache.kafka.clients.consumer.internals.RequestFutureListener#onSuccess
看图比较形象
// 上文说到,RequestFutureAdapter是用于跟broker的交互处理回调使用的
// 在发送请求给broker前,会先放一个监听器在队列中
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
final RequestFuture<S> adapted = new RequestFuture<>();
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted;
}
// 请求回来之后会调用这个接口
public void fireCompletion() {
// 如果有异常,走future.raise()
if (e != null) {
future.raise(e);
} else if (response.authenticationException() != null) {
future.raise(response.authenticationException());
} else if (response.wasDisconnected()) {
log.debug("Cancelled request with header {} due to node {} being disconnected",
response.requestHeader(), response.destination());
future.raise(DisconnectException.INSTANCE);
} else if (response.versionMismatch() != null) {
future.raise(response.versionMismatch());
} else {
// 如果无异常,走future.complete()
future.complete(response);
}
}
// 只看无异常的逻辑,有异常的逻辑是相似
// 无异常会调用这个方法
public void complete(T value) {
try {
if (value instanceof RuntimeException)
throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
// 主要是调用这个方法
fireSuccess();
} finally {
completedLatch.countDown();
}
}
private void fireSuccess() {
T value = value();
while (true) {
// 这里拿出发送请求前放在队列中的监听器
RequestFutureListener<T> listener = listeners.poll();
if (listener == null)
break;
// 此处回调监听器
listener.onSuccess(value);
}
}
RequestFuture
RequestFuture 是一个泛型类,其核心方法如下
// RequestFutureListener集合,用来监听请求完成的情况RequestFutureListener, 接口有onSuccess和
// onFailure()两个方法,对应于请求正常完成和出现异常两种情况
private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners = new ConcurrentLinkedQueue<>();
// 表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为 true
public boolean isDone() {
return result.get() != INCOMPLETE_SENTINEL;
}
// 记录请求正常完成时收到的响应,与 exception 字段互斥。此字段非空表示正常完成,反之表示出现异常。
public T value() {
if (!succeeded())
throw new IllegalStateException("Attempt to retrieve value from future which hasn't successfully completed");
return (T) result.get();
}
// 记录导致请求异常完成的异常类,与 value 字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
public RuntimeException exception() {
if (!failed())
throw new IllegalStateException("Attempt to retrieve exception from future which hasn't failed");
return (RuntimeException) result.get();
}
Rebalance代码
ConsumerCoordinator
上文讲了rebalance的原理,在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端Group Coordinator的交互,ConsumerCoordinator继承了抽象类AbstractCoordinator,下面我们先来介绍AbstractCoordinator的重要成员变量
public abstract class AbstractCoordinator implements Closeable {
protected enum MemberState {
// 消费者还没加入到服务端Group Coordinator中
UNJOINED,
// 消费者发出了加入Group Coordinator的请求,但是还没收到响应
PREPARING_REBALANCE,
// 消费者收到了加入Group Coordinator的响应
COMPLETING_REBALANCE,
// 消费者发出了加入Group Coordinator的请求,并正在向Group Coordinator发送心跳
STABLE;
public boolean hasNotJoinedGroup() {
return equals(UNJOINED) || equals(PREPARING_REBALANCE);
}
}
// 心跳任务的辅助类
private final Heartbeat heartbeat;
// Node的类对象,保存着Group Coordinator的节点信息
private Node coordinator = null;
// 是否重新发送加入Group Coordinator的请求
private boolean rejoinNeeded = true;
private boolean needsJoinPrepare = true;
// 维持心跳的线程,加入到Group Coordinator后,会发起定时的心跳线程
private HeartbeatThread heartbeatThread = null;
// 加入Group Coordinator的异步请求类对象
private RequestFuture<ByteBuffer> joinFuture = null;
// consumer查找Group Coordinator的异步请求类对象
private RequestFuture<Void> findCoordinatorFuture = null;
private volatile RuntimeException fatalFindCoordinatorException = null;
// Group Coordinator发过来的Rebalance年代,用来区分两次rebalance操作,由于网络延迟等问题
// 在执行rebalance操作时可能收到上次rebalance过程的请求,避免这种干扰,每次rebalance操作都会递增generation的值
private Generation generation = Generation.NO_GENERATION;
// 最后一次rebalance的开始时间
private long lastRebalanceStartMs = -1L;
// 最后一次rebalance的结束时间
private long lastRebalanceEndMs = -1L;
}
再看看ConsumerCoordinator的重要成员变量
public final class ConsumerCoordinator extends AbstractCoordinator {
private final GroupRebalanceConfig rebalanceConfig;
private final Logger log;
// 订阅分区任务列表,列表的元素是ConsumerPartitionAssignor,ConsumerPartitionAssignor里是发送
// JoinGroupRequest请求中消费者支持的分区算法等信息,Group Coordinator会从所有消费者都支持的算法中选择一个,
// 并通知leader consumer使用这个分区算法进行分配,一个消费者可以包含多个分区算法,可以在
// partition.assignment.strategy参数中配置
private final List<ConsumerPartitionAssignor> assignors;
// 消费者元数据
private final ConsumerMetadata metadata;
private final ConsumerCoordinatorMetrics sensors;
// 订阅状态,保存了主题分区和offset的对应关系
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
// 是否自动提交offset
private final boolean autoCommitEnabled;
// 提交offset的时间间隔
private final int autoCommitIntervalMs;
private final ConsumerInterceptors<?, ?> interceptors;
private final AtomicInteger pendingAsyncCommits;
// this collection must be thread-safe because it is modified from the response handler
// of offset commit requests, which may be invoked from the heartbeat thread
// 提交offset的任务列表,队列的元素是提交offset后处理响应的回调对象,OffsetCommitCompletion是发出
// 提交offset的请求后,会把处理响应的回调对象放到这个队列里
private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
// 是否leader
private boolean isLeader = false;
private Set<String> joinedSubscription;
// 保存分区信息,用来监控分区信息是否变了
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
private Timer nextAutoCommitTimer;
private AtomicBoolean asyncCommitFenced;
// 消费者元数据
private ConsumerGroupMetadata groupMetadata;
private final boolean throwOnFetchStableOffsetsUnsupported;
}
第一阶段
rebalance操作的第一步就是查找Group Coordinator,这个阶段consumer会向Kafka集群中的任意一个broker发送FindCoordinatorRequest,并使用FindCoordinatorResponseHandler处理返回的response,发送请求的入口在org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
// 判断是否与Group Coordinator建立连接
if (!coordinatorUnknown())
return true;
do {
if (fatalFindCoordinatorException != null) {
final RuntimeException fatalException = fatalFindCoordinatorException;
fatalFindCoordinatorException = null;
throw fatalException;
}
// 如果未建立连接,则开始查找Group Coordinator的预处理
final RequestFuture<Void> future = lookupCoordinator();
// 不断请求集群中任一节点直到返回Group Coordinator的信息
client.poll(future, timer);
// 超时则退出循环
if (!future.isDone()) {
// ran out of time
break;
}
RuntimeException fatalException = null;
// 异常处理
if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
// 如果出现RetriableException,调用awaitMetadataUpdate()方法阻塞更新Metadata中记录的集群元数据
// 成功后重新请求
client.awaitMetadataUpdate(timer);
} else {
fatalException = future.exception();
log.info("FindCoordinator request hit fatal exception", fatalException);
}
// 成功找到Group Coordinator节点,但是网络连接失败
} else if (coordinator != null && client.isUnavailable(coordinator)) {
// 将Group Coordinator置为null,退避一段时间后重新查找
markCoordinatorUnknown("coordinator unavailable");
timer.sleep(rebalanceConfig.retryBackoffMs);
}
// 将异步请求类对象置为空,等待下一次请求
clearFindCoordinatorFuture();
if (fatalException != null)
throw fatalException;
// 不断尝试获取Group Coordinator的信息,直到成功或者超时
} while (coordinatorUnknown() && timer.notExpired());
return !coordinatorUnknown();
}
下面来看看lookupCoordinator实现
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
// 找到负载最小的node节点
Node node = this.client.leastLoadedNode();
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else {
// 构建请求并放进缓存队列中
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);
// 构建查找Group Coordinator节点的请求
FindCoordinatorRequest.Builder requestBuilder =
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId));
// 发送请求,并用FindCoordinatorResponseHandler类对象来处理响应
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}
看看第一阶段的请求报文
FindCoordinatorResponseHandler类是用来处理response的
先看看response的报文,很明显可以看到broker所在的Node节点的host跟port
private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Received FindCoordinator response {}", resp);
// 请求Group Coordinator节点成功,服务端的回应
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
Errors error = findCoordinatorResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
// 使用Integer.MAX_VALUE - node.id作为协调器id以允许单独的连接
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
// 构建Group Coordinator节点的Node对象
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());
log.info("Discovered group coordinator {}", coordinator);
// 尝试与Group Coordinator节点建立连接
client.tryConnect(coordinator);
heartbeat.resetSessionTimeout();
}
// 响应传播
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
future.raise(error);
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
......
}
}
第二阶段
在成功查找到对应的Group Coordinator之后进入Join Group阶段,在此阶段,consumer会向Group Coordinator发送JoinGroupRequest请求,并处理响应
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
// 常规套路,再次检查是否已与Group Coordinator建立连接
if (!ensureCoordinatorReady(timer)) {
return false;
}
// needsJoinPrepare 这个标志位是为了不二次调用onJoinPrepare()这个方法
if (needsJoinPrepare) {
// 如果rebalance失败,下次进行rebalance不需要走进这个逻辑
needsJoinPrepare = false;
// 提交offset 和 调用用户自定义的ConsumerRebalanceListener,有些场景用户想要在Rebalance之前做一些事情
// 比如保存消费的offset,避免重复消费
onJoinPrepare(generation.generationId, generation.memberId);
}
// 初始化joinGroup请求,并发送该请求
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// we ran out of time
return false;
}
// 如果异步请求成功了
if (future.succeeded()) {
......
}
}
return true;
}
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
// 设置consumer的状态为PREPARING_REBALANCE,表示开始重平衡了
state = MemberState.PREPARING_REBALANCE;
// 如果前一个rebalance失败,可以连续触发下一个rebalance
// 在这种情况下,我们不会更新开始时间
if (lastRebalanceStartMs == -1L)
lastRebalanceStartMs = time.milliseconds();
// 发送JoinGroupRequest
joinFuture = sendJoinGroupRequest();
// 为joinFuture加一个回调的监听器
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
// 成功不会做任何事,因为逻辑都在JoinGroupRequest的回调对象JoinGroupResponseHandler里
}
@Override
public void onFailure(RuntimeException e) {
// we handle failures below after the request finishes. if the join completes
// after having been woken up, the exception is ignored and we will rejoin;
// this can be triggered when either join or sync request failed
synchronized (AbstractCoordinator.this) {
sensors.failedRebalanceSensor.record();
}
}
});
}
return joinFuture;
}
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
log.info("(Re-)joining group");
// 构建加入Group Coordinator的请求
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
// group.id配置
.setGroupId(rebalanceConfig.groupId)
// 消费者定期发送心跳证明自己的存活,如果在这个时间之内broker没收到,那broker就将此消费者从group中移除
// 需要注意的是,这个值必须在broker属性group.min.session.timeout.ms和group.max.session.timeout.ms的值中间
.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
// 用户提供的消费者实例的唯一标识符,如果设置了,消费者则被视为静态成员,静态成员配以较大的session超时设置能够避免
// 因成员临时不可用(比如重启)而引起的rebalance,如果不设置,消费者将作为动态成员
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
// consumer发起的协议,为consumer
.setProtocolType(protocolType())
// 分区分配策略和对应的订阅元信息
.setProtocols(metadata())
// 默认为300000(5分钟),如果消费者两次poll的时间超过了此值,那就认为消费者能力不足,会触发rebalance,将该消费者
// 消费的分区分配给它人
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
// Note that we override the request timeout using the rebalance timeout since that is the
// maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),
rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
// 发送请求,并设置处理返回值的对象
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler(generation));
}
看看发送的报文长什么样子
看看response长什么样子
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
private JoinGroupResponseHandler(final Generation generation) {
super(generation);
}
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
// 协议类型不一致
if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}",
joinResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinSensor.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
if (state != MemberState.PREPARING_REBALANCE) {
future.raise(new UnjoinedGroupException());
} else {
// 更新状态为COMPLETING_REBALANCE
state = MemberState.COMPLETING_REBALANCE;
// 启动心跳线程
if (heartbeatThread != null)
heartbeatThread.enable();
AbstractCoordinator.this.generation = new Generation(
joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
// 判断是否leader节点
if (joinResponse.isLeader()) {
// 如果是leader节点,需要计算一下新的分配方案,再发送给Group Coordinator
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
}
}
......
}
主要看一下leader节点是怎么操作的
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
// 基于响应返回的分区策略和消费者里所有的分区信息,计算出每个消费者分配的分区
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
.setMemberId(assignment.getKey())
.setAssignment(Utils.toArray(assignment.getValue()))
);
}
// 构建发送分区消费方案的请求
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setProtocolType(protocolType())
.setProtocolName(generation.protocolName)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList)
);
log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
// 发送分区消费方案的请求
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, requestBuilder)
.compose(new SyncGroupResponseHandler(generation));
}
看看第三阶段的报文长什么样子
第三阶段
完成分区分配之后就进入了Synchronizing Group State阶段,主要逻辑是向Group Coordinator发送SyncGroupRequest请求并处理SyncGroupResponse响应
看看报文长什么样子
public void handle(SyncGroupResponse syncResponse,
RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
// 如果响应没有异常
if (error == Errors.NONE) {
......
synchronized (AbstractCoordinator.this) {
if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
......
} else {
log.info("Successfully synced group in generation {}", generation);
// 修改rebalance状态
state = MemberState.STABLE;
rejoinNeeded = false;
// 记录rebalance完成时间
lastRebalanceEndMs = time.milliseconds();
sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
lastRebalanceStartMs = -1L;
// 传播rebalance完成事件
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
}
} else {
log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
"receiving SyncGroup response, marking this rebalance as failed and retry",
generation, state);
// use ILLEGAL_GENERATION error code to let it retry immediately
future.raise(Errors.ILLEGAL_GENERATION);
}
}
}
} else {
......
}
处理完SyncGroupResponse后会调用下面这个方法设置订阅的分区
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);
// 只有leader consumer才关心元数据的变化
if (!isLeader)
assignmentSnapshot = null;
// 查找使用的分区策略
ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
// Give the assignor a chance to update internal state based on the received assignment
groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
// 反序列化,更新assignment
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
// 获取分配的分区
Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());
......
final AtomicReference<Exception> firstException = new AtomicReference<>(null);
Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);
......
// 设置订阅的分区
subscriptions.assignFromSubscribed(assignedPartitions);
......
}
至此,消费者便可以开始消费数据了
参考资料
转载自:https://juejin.cn/post/7247324653839302715