private CoordinatorResult classicGroupJoinToConsumerGroup()

in group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java [2311:2489]


    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGroup(
        ConsumerGroup group,
        AuthorizableRequestContext context,
        JoinGroupRequestData request,
        CompletableFuture<JoinGroupResponseData> responseFuture
    ) throws ApiException {
        final long currentTimeMs = time.milliseconds();
        final List<CoordinatorRecord> records = new ArrayList<>();
        final String groupId = request.groupId();
        final String instanceId = request.groupInstanceId();
        final int sessionTimeoutMs = request.sessionTimeoutMs();
        final JoinGroupRequestProtocolCollection protocols = request.protocols();

        String memberId = request.memberId();
        final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
        if (isUnknownMember) memberId = Uuid.randomUuid().toString();

        throwIfConsumerGroupIsFull(group, memberId);
        throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols);

        if (JoinGroupRequest.requiresKnownMemberId(request, context.requestVersion())) {
            // A dynamic member requiring a member id joins the group. Send back a response to call for another
            // join group request with allocated member id.
            responseFuture.complete(new JoinGroupResponseData()
                .setMemberId(memberId)
                .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
            );
            log.info("[GroupId {}] Dynamic member with unknown member id joins the consumer group. " +
                "Created a new member id {} and requesting the member to rejoin with this id.", groupId, memberId);
            return EMPTY_RESULT;
        }

        // Get or create the member.
        final ConsumerGroupMember member;
        if (instanceId == null) {
            member = getOrMaybeSubscribeDynamicConsumerGroupMember(
                group,
                memberId,
                -1,
                List.of(),
                true,
                true
            );
        } else {
            member = getOrMaybeSubscribeStaticConsumerGroupMember(
                group,
                memberId,
                -1,
                instanceId,
                List.of(),
                isUnknownMember,
                true,
                records
            );
        }

        int groupEpoch = group.groupEpoch();
        Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
        SubscriptionType subscriptionType = group.subscriptionType();
        final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);

        // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue
        // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have
        // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue
        // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have
        // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition.
        ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member)
            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
            .maybeUpdateRackId(Utils.toOptional(subscription.rackId()))
            .maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
            .maybeUpdateServerAssignorName(Optional.empty())
            .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
            .setClientId(context.clientId())
            .setClientHost(context.clientAddress().toString())
            .setClassicMemberMetadata(
                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
                    .setSessionTimeoutMs(sessionTimeoutMs)
                    .setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols)))
            .build();

        boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
            groupId,
            member,
            updatedMember,
            records
        );

        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
            // The subscription metadata is updated in two cases:
            // 1) The member has updated its subscriptions;
            // 2) The refresh deadline has been reached.
            UpdateSubscriptionMetadataResult result = updateSubscriptionMetadata(
                group,
                bumpGroupEpoch,
                member,
                updatedMember,
                records
            );

            groupEpoch = result.groupEpoch;
            subscriptionMetadata = result.subscriptionMetadata;
            subscriptionType = result.subscriptionType;
        }

        // 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
        // the existing and the new target assignment is persisted to the partition.
        final int targetAssignmentEpoch;
        final Assignment targetAssignment;

        if (groupEpoch > group.assignmentEpoch()) {
            targetAssignment = updateTargetAssignment(
                group,
                groupEpoch,
                member,
                updatedMember,
                subscriptionMetadata,
                subscriptionType,
                records
            );
            targetAssignmentEpoch = groupEpoch;
        } else {
            targetAssignmentEpoch = group.assignmentEpoch();
            targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());

        }

        // 3. Reconcile the member's assignment with the target assignment if the member is not
        // fully reconciled yet.
        updatedMember = maybeReconcile(
            groupId,
            updatedMember,
            group::currentPartitionEpoch,
            targetAssignmentEpoch,
            targetAssignment,
            toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
            records
        );

        // 4. Maybe downgrade the consumer group if the last static member using the
        // consumer protocol is replaced by the joining static member.
        ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
        boolean downgrade = existingStaticMemberOrNull != null &&
            validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
        if (downgrade) {
            convertToClassicGroup(
                group,
                Set.of(),
                updatedMember,
                records
            );
        }

        final JoinGroupResponseData response = new JoinGroupResponseData()
            .setMemberId(updatedMember.memberId())
            .setGenerationId(updatedMember.memberEpoch())
            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
            .setProtocolName(updatedMember.supportedClassicProtocols().get().iterator().next().name());

        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
        appendFuture.whenComplete((__, t) -> {
            if (t == null) {
                cancelConsumerGroupJoinTimeout(groupId, response.memberId());
                if (!downgrade) {
                    // If the group is still a consumer group, schedule the session
                    // timeout for the joining member and the sync timeout to ensure
                    // that the member send sync request within the rebalance timeout.
                    scheduleConsumerGroupSessionTimeout(groupId, response.memberId(), sessionTimeoutMs);
                    scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs());
                }
                responseFuture.complete(response);
            }
        });

        // If the joining member triggers a valid downgrade, the soft states will be directly
        // updated in the conversion method, so the records don't need to be replayed.
        // If the joining member doesn't trigger a valid downgrade, the group is still a
        // consumer group. We still rely on replaying records to update the soft states.
        return new CoordinatorResult<>(records, null, appendFuture, !downgrade);
    }