in group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java [2311:2489]
private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGroup(
ConsumerGroup group,
AuthorizableRequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
) throws ApiException {
final long currentTimeMs = time.milliseconds();
final List<CoordinatorRecord> records = new ArrayList<>();
final String groupId = request.groupId();
final String instanceId = request.groupInstanceId();
final int sessionTimeoutMs = request.sessionTimeoutMs();
final JoinGroupRequestProtocolCollection protocols = request.protocols();
String memberId = request.memberId();
final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
if (isUnknownMember) memberId = Uuid.randomUuid().toString();
throwIfConsumerGroupIsFull(group, memberId);
throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols);
if (JoinGroupRequest.requiresKnownMemberId(request, context.requestVersion())) {
// A dynamic member requiring a member id joins the group. Send back a response to call for another
// join group request with allocated member id.
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(memberId)
.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
);
log.info("[GroupId {}] Dynamic member with unknown member id joins the consumer group. " +
"Created a new member id {} and requesting the member to rejoin with this id.", groupId, memberId);
return EMPTY_RESULT;
}
// Get or create the member.
final ConsumerGroupMember member;
if (instanceId == null) {
member = getOrMaybeSubscribeDynamicConsumerGroupMember(
group,
memberId,
-1,
List.of(),
true,
true
);
} else {
member = getOrMaybeSubscribeStaticConsumerGroupMember(
group,
memberId,
-1,
instanceId,
List.of(),
isUnknownMember,
true,
records
);
}
int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
SubscriptionType subscriptionType = group.subscriptionType();
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);
// 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue
// record is written to the __consumer_offsets partition to persist the change. If the subscriptions have
// changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue
// record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have
// changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition.
ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member)
.maybeUpdateInstanceId(Optional.ofNullable(instanceId))
.maybeUpdateRackId(Utils.toOptional(subscription.rackId()))
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
.maybeUpdateServerAssignorName(Optional.empty())
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
.setClientId(context.clientId())
.setClientHost(context.clientAddress().toString())
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(sessionTimeoutMs)
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols)))
.build();
boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
// 2) The refresh deadline has been reached.
UpdateSubscriptionMetadataResult result = updateSubscriptionMetadata(
group,
bumpGroupEpoch,
member,
updatedMember,
records
);
groupEpoch = result.groupEpoch;
subscriptionMetadata = result.subscriptionMetadata;
subscriptionType = result.subscriptionType;
}
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
// the existing and the new target assignment is persisted to the partition.
final int targetAssignmentEpoch;
final Assignment targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateTargetAssignment(
group,
groupEpoch,
member,
updatedMember,
subscriptionMetadata,
subscriptionType,
records
);
targetAssignmentEpoch = groupEpoch;
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
}
// 3. Reconcile the member's assignment with the target assignment if the member is not
// fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
records
);
// 4. Maybe downgrade the consumer group if the last static member using the
// consumer protocol is replaced by the joining static member.
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
boolean downgrade = existingStaticMemberOrNull != null &&
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
if (downgrade) {
convertToClassicGroup(
group,
Set.of(),
updatedMember,
records
);
}
final JoinGroupResponseData response = new JoinGroupResponseData()
.setMemberId(updatedMember.memberId())
.setGenerationId(updatedMember.memberEpoch())
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setProtocolName(updatedMember.supportedClassicProtocols().get().iterator().next().name());
CompletableFuture<Void> appendFuture = new CompletableFuture<>();
appendFuture.whenComplete((__, t) -> {
if (t == null) {
cancelConsumerGroupJoinTimeout(groupId, response.memberId());
if (!downgrade) {
// If the group is still a consumer group, schedule the session
// timeout for the joining member and the sync timeout to ensure
// that the member send sync request within the rebalance timeout.
scheduleConsumerGroupSessionTimeout(groupId, response.memberId(), sessionTimeoutMs);
scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs());
}
responseFuture.complete(response);
}
});
// If the joining member triggers a valid downgrade, the soft states will be directly
// updated in the conversion method, so the records don't need to be replayed.
// If the joining member doesn't trigger a valid downgrade, the group is still a
// consumer group. We still rely on replaying records to update the soft states.
return new CoordinatorResult<>(records, null, appendFuture, !downgrade);
}