private CoordinatorResult streamsGroupHeartbeat()

in group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java [1829:2022]


    private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
        String groupId,
        String memberId,
        int memberEpoch,
        String instanceId,
        String rackId,
        int rebalanceTimeoutMs,
        String clientId,
        String clientHost,
        StreamsGroupHeartbeatRequestData.Topology topology,
        List<TaskIds> ownedActiveTasks,
        List<TaskIds> ownedStandbyTasks,
        List<TaskIds> ownedWarmupTasks,
        String processId,
        Endpoint userEndpoint,
        List<KeyValue> clientTags,
        boolean shutdownApplication
    ) throws ApiException {
        final long currentTimeMs = time.milliseconds();
        final List<CoordinatorRecord> records = new ArrayList<>();
        final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus = new ArrayList<>();

        // Get or create the streams group.
        boolean isJoining = memberEpoch == 0;
        StreamsGroup group;
        if (isJoining) {
            group = getOrCreateStreamsGroup(groupId);
            throwIfStreamsGroupIsFull(group);
        } else {
            group = getStreamsGroupOrThrow(groupId);
        }

        // Get or create the member.
        StreamsGroupMember member;
        if (instanceId == null) {
            member = getOrMaybeCreateDynamicStreamsGroupMember(
                group,
                memberId,
                memberEpoch,
                ownedActiveTasks,
                ownedStandbyTasks,
                ownedWarmupTasks,
                isJoining
            );
        } else {
            throw new UnsupportedOperationException("Static members are not supported yet.");
        }

        // 1. Create or update the member.
        StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member)
            .maybeUpdateInstanceId(Optional.empty())
            .maybeUpdateRackId(Optional.ofNullable(rackId))
            .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
            .maybeUpdateTopologyEpoch(topology != null ? OptionalInt.of(topology.epoch()) : OptionalInt.empty())
            .setClientId(clientId)
            .setClientHost(clientHost)
            .maybeUpdateProcessId(Optional.ofNullable(processId))
            .maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x -> x.stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value))))
            .maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x -> new StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port())))
            .build();

        // If the member is new or has changed, a StreamsGroupMemberMetadataValue record is written to the __consumer_offsets partition
        // to persist the change, and bump the group epoch later.
        boolean bumpGroupEpoch = hasStreamsMemberMetadataChanged(groupId, member, updatedMember, records);

        // 2. Initialize/Update the group topology.
        // If the topology is new or has changed, a StreamsGroupTopologyValue record is written to the __consumer_offsets partition to persist
        // the change. The group epoch is bumped if the topology has changed.
        StreamsTopology updatedTopology = maybeUpdateTopology(groupId, memberId, topology, group, records);
        maybeSetTopologyStaleStatus(group, updatedMember, returnedStatus);

        // 3. Determine the partition metadata and any internal topics if needed.
        ConfiguredTopology updatedConfiguredTopology;
        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> updatedPartitionMetadata;
        boolean reconfigureTopology = group.topology().isEmpty();
        if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) {

            updatedPartitionMetadata = group.computePartitionMetadata(
                metadataImage.topics(),
                updatedTopology
            );

            if (!updatedPartitionMetadata.equals(group.partitionMetadata())) {
                log.info("[GroupId {}][MemberId {}] Computed new partition metadata: {}.",
                    groupId, memberId, updatedPartitionMetadata);
                bumpGroupEpoch = true;
                reconfigureTopology = true;
                records.add(newStreamsGroupPartitionMetadataRecord(groupId, updatedPartitionMetadata));
                group.setPartitionMetadata(updatedPartitionMetadata);
            }

            if (reconfigureTopology || group.configuredTopology().isEmpty()) {
                log.info("[GroupId {}][MemberId {}] Configuring the topology {}", groupId, memberId, updatedTopology);
                updatedConfiguredTopology = InternalTopicManager.configureTopics(logContext, updatedTopology, updatedPartitionMetadata);
            } else {
                updatedConfiguredTopology = group.configuredTopology().get();
            }
        } else {
            updatedConfiguredTopology = group.configuredTopology().get();
            updatedPartitionMetadata = group.partitionMetadata();
        }

        // Actually bump the group epoch
        int groupEpoch = group.groupEpoch();
        if (bumpGroupEpoch) {
            groupEpoch += 1;
            records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
            log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {}.", groupId, memberId, groupEpoch);
            metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
            group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
        }

        // 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
        // replaces an existing static member.
        // The delta between the existing and the new target assignment is persisted to the partition.
        int targetAssignmentEpoch;
        TasksTuple targetAssignment;
        if (groupEpoch > group.assignmentEpoch()) {
            targetAssignment = updateStreamsTargetAssignment(
                group,
                groupEpoch,
                updatedMember,
                updatedConfiguredTopology,
                updatedPartitionMetadata,
                records
            );
            targetAssignmentEpoch = groupEpoch;
        } else {
            targetAssignmentEpoch = group.assignmentEpoch();
            targetAssignment = group.targetAssignment(updatedMember.memberId());
        }

        // 5. Reconcile the member's assignment with the target assignment if the member is not
        // fully reconciled yet.
        updatedMember = maybeReconcile(
            groupId,
            updatedMember,
            group::currentActiveTaskProcessId,
            group::currentStandbyTaskProcessIds,
            group::currentWarmupTaskProcessIds,
            targetAssignmentEpoch,
            targetAssignment,
            ownedActiveTasks,
            ownedStandbyTasks,
            ownedWarmupTasks,
            records
        );

        scheduleStreamsGroupSessionTimeout(groupId, memberId);
        if (shutdownApplication) {
            group.setShutdownRequestMemberId(memberId);
        }

        // Prepare the response.
        StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
            .setMemberId(updatedMember.memberId())
            .setMemberEpoch(updatedMember.memberEpoch())
            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
            .setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));

        // The assignment is only provided in the following cases:
        // 1. The member is joining.
        // 2. The member's assignment has been updated.
        if (memberEpoch == 0 || hasAssignedTasksChanged(member, updatedMember)) {
            response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
            response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
            response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
        }

        Map<String, CreatableTopic> internalTopicsToBeCreated = Collections.emptyMap();
        if (updatedConfiguredTopology.topicConfigurationException().isPresent()) {
            TopicConfigurationException exception = updatedConfiguredTopology.topicConfigurationException().get();
            internalTopicsToBeCreated = updatedConfiguredTopology.internalTopicsToBeCreated();
            returnedStatus.add(
                new StreamsGroupHeartbeatResponseData.Status()
                    .setStatusCode(exception.status().code())
                    .setStatusDetail(exception.getMessage())
            );
        }

        group.getShutdownRequestMemberId().ifPresent(requestingMemberId -> returnedStatus.add(
            new Status()
                .setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
                .setStatusDetail(
                    String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.",
                        requestingMemberId)
                )
        ));

        if (!returnedStatus.isEmpty()) {
            response.setStatus(returnedStatus);
        }
        return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
    }