in group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java [1829:2022]
private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
String groupId,
String memberId,
int memberEpoch,
String instanceId,
String rackId,
int rebalanceTimeoutMs,
String clientId,
String clientHost,
StreamsGroupHeartbeatRequestData.Topology topology,
List<TaskIds> ownedActiveTasks,
List<TaskIds> ownedStandbyTasks,
List<TaskIds> ownedWarmupTasks,
String processId,
Endpoint userEndpoint,
List<KeyValue> clientTags,
boolean shutdownApplication
) throws ApiException {
final long currentTimeMs = time.milliseconds();
final List<CoordinatorRecord> records = new ArrayList<>();
final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus = new ArrayList<>();
// Get or create the streams group.
boolean isJoining = memberEpoch == 0;
StreamsGroup group;
if (isJoining) {
group = getOrCreateStreamsGroup(groupId);
throwIfStreamsGroupIsFull(group);
} else {
group = getStreamsGroupOrThrow(groupId);
}
// Get or create the member.
StreamsGroupMember member;
if (instanceId == null) {
member = getOrMaybeCreateDynamicStreamsGroupMember(
group,
memberId,
memberEpoch,
ownedActiveTasks,
ownedStandbyTasks,
ownedWarmupTasks,
isJoining
);
} else {
throw new UnsupportedOperationException("Static members are not supported yet.");
}
// 1. Create or update the member.
StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member)
.maybeUpdateInstanceId(Optional.empty())
.maybeUpdateRackId(Optional.ofNullable(rackId))
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
.maybeUpdateTopologyEpoch(topology != null ? OptionalInt.of(topology.epoch()) : OptionalInt.empty())
.setClientId(clientId)
.setClientHost(clientHost)
.maybeUpdateProcessId(Optional.ofNullable(processId))
.maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x -> x.stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value))))
.maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x -> new StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port())))
.build();
// If the member is new or has changed, a StreamsGroupMemberMetadataValue record is written to the __consumer_offsets partition
// to persist the change, and bump the group epoch later.
boolean bumpGroupEpoch = hasStreamsMemberMetadataChanged(groupId, member, updatedMember, records);
// 2. Initialize/Update the group topology.
// If the topology is new or has changed, a StreamsGroupTopologyValue record is written to the __consumer_offsets partition to persist
// the change. The group epoch is bumped if the topology has changed.
StreamsTopology updatedTopology = maybeUpdateTopology(groupId, memberId, topology, group, records);
maybeSetTopologyStaleStatus(group, updatedMember, returnedStatus);
// 3. Determine the partition metadata and any internal topics if needed.
ConfiguredTopology updatedConfiguredTopology;
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> updatedPartitionMetadata;
boolean reconfigureTopology = group.topology().isEmpty();
if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) {
updatedPartitionMetadata = group.computePartitionMetadata(
metadataImage.topics(),
updatedTopology
);
if (!updatedPartitionMetadata.equals(group.partitionMetadata())) {
log.info("[GroupId {}][MemberId {}] Computed new partition metadata: {}.",
groupId, memberId, updatedPartitionMetadata);
bumpGroupEpoch = true;
reconfigureTopology = true;
records.add(newStreamsGroupPartitionMetadataRecord(groupId, updatedPartitionMetadata));
group.setPartitionMetadata(updatedPartitionMetadata);
}
if (reconfigureTopology || group.configuredTopology().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Configuring the topology {}", groupId, memberId, updatedTopology);
updatedConfiguredTopology = InternalTopicManager.configureTopics(logContext, updatedTopology, updatedPartitionMetadata);
} else {
updatedConfiguredTopology = group.configuredTopology().get();
}
} else {
updatedConfiguredTopology = group.configuredTopology().get();
updatedPartitionMetadata = group.partitionMetadata();
}
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {}.", groupId, memberId, groupEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
// 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
// replaces an existing static member.
// The delta between the existing and the new target assignment is persisted to the partition.
int targetAssignmentEpoch;
TasksTuple targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateStreamsTargetAssignment(
group,
groupEpoch,
updatedMember,
updatedConfiguredTopology,
updatedPartitionMetadata,
records
);
targetAssignmentEpoch = groupEpoch;
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId());
}
// 5. Reconcile the member's assignment with the target assignment if the member is not
// fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
group::currentActiveTaskProcessId,
group::currentStandbyTaskProcessIds,
group::currentWarmupTaskProcessIds,
targetAssignmentEpoch,
targetAssignment,
ownedActiveTasks,
ownedStandbyTasks,
ownedWarmupTasks,
records
);
scheduleStreamsGroupSessionTimeout(groupId, memberId);
if (shutdownApplication) {
group.setShutdownRequestMemberId(memberId);
}
// Prepare the response.
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
// The assignment is only provided in the following cases:
// 1. The member is joining.
// 2. The member's assignment has been updated.
if (memberEpoch == 0 || hasAssignedTasksChanged(member, updatedMember)) {
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
}
Map<String, CreatableTopic> internalTopicsToBeCreated = Collections.emptyMap();
if (updatedConfiguredTopology.topicConfigurationException().isPresent()) {
TopicConfigurationException exception = updatedConfiguredTopology.topicConfigurationException().get();
internalTopicsToBeCreated = updatedConfiguredTopology.internalTopicsToBeCreated();
returnedStatus.add(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(exception.status().code())
.setStatusDetail(exception.getMessage())
);
}
group.getShutdownRequestMemberId().ifPresent(requestingMemberId -> returnedStatus.add(
new Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(
String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.",
requestingMemberId)
)
));
if (!returnedStatus.isEmpty()) {
response.setStatus(returnedStatus);
}
return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
}