in group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java [56:387]
public record StreamsGroupMember(String memberId,
Integer memberEpoch,
Integer previousMemberEpoch,
MemberState state,
Optional<String> instanceId,
Optional<String> rackId,
String clientId,
String clientHost,
Integer rebalanceTimeoutMs,
Integer topologyEpoch,
String processId,
Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint,
Map<String, String> clientTags,
TasksTuple assignedTasks,
TasksTuple tasksPendingRevocation) {
public StreamsGroupMember {
Objects.requireNonNull(memberId, "memberId cannot be null");
clientTags = clientTags != null ? Collections.unmodifiableMap(clientTags) : null;
}
/**
* A builder that facilitates the creation of a new member or the update of an existing one.
* <p>
* Please refer to the javadoc of {{@link StreamsGroupMember}} for the definition of the fields.
*/
public static class Builder {
private final String memberId;
private Integer memberEpoch = null;
private Integer previousMemberEpoch = null;
private MemberState state = null;
private Optional<String> instanceId = null;
private Optional<String> rackId = null;
private Integer rebalanceTimeoutMs = null;
private String clientId = null;
private String clientHost = null;
private Integer topologyEpoch = null;
private String processId = null;
private Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint = null;
private Map<String, String> clientTags = null;
private TasksTuple assignedTasks = null;
private TasksTuple tasksPendingRevocation = null;
public Builder(String memberId) {
this.memberId = Objects.requireNonNull(memberId, "memberId cannot be null");
}
public Builder(StreamsGroupMember member) {
Objects.requireNonNull(member, "member cannot be null");
this.memberId = member.memberId;
this.memberEpoch = member.memberEpoch;
this.previousMemberEpoch = member.previousMemberEpoch;
this.instanceId = member.instanceId;
this.rackId = member.rackId;
this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
this.clientId = member.clientId;
this.clientHost = member.clientHost;
this.topologyEpoch = member.topologyEpoch;
this.processId = member.processId;
this.userEndpoint = member.userEndpoint;
this.clientTags = member.clientTags;
this.state = member.state;
this.assignedTasks = member.assignedTasks;
this.tasksPendingRevocation = member.tasksPendingRevocation;
}
public Builder updateMemberEpoch(int memberEpoch) {
int currentMemberEpoch = this.memberEpoch;
this.memberEpoch = memberEpoch;
this.previousMemberEpoch = currentMemberEpoch;
return this;
}
public Builder setMemberEpoch(int memberEpoch) {
this.memberEpoch = memberEpoch;
return this;
}
public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
this.previousMemberEpoch = previousMemberEpoch;
return this;
}
public Builder setInstanceId(String instanceId) {
this.instanceId = Optional.ofNullable(instanceId);
return this;
}
public Builder maybeUpdateInstanceId(Optional<String> instanceId) {
instanceId.ifPresent(this::setInstanceId);
return this;
}
public Builder setRackId(String rackId) {
this.rackId = Optional.ofNullable(rackId);
return this;
}
public Builder maybeUpdateRackId(Optional<String> rackId) {
rackId.ifPresent(this::setRackId);
return this;
}
public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
return this;
}
public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt rebalanceTimeoutMs) {
this.rebalanceTimeoutMs = rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs);
return this;
}
public Builder setClientId(String clientId) {
this.clientId = clientId;
return this;
}
public Builder setClientHost(String clientHost) {
this.clientHost = clientHost;
return this;
}
public Builder setState(MemberState state) {
this.state = state;
return this;
}
public Builder setTopologyEpoch(int topologyEpoch) {
this.topologyEpoch = topologyEpoch;
return this;
}
public Builder maybeUpdateTopologyEpoch(OptionalInt topologyEpoch) {
this.topologyEpoch = topologyEpoch.orElse(this.topologyEpoch);
return this;
}
public Builder setProcessId(String processId) {
this.processId = processId;
return this;
}
public Builder maybeUpdateProcessId(Optional<String> processId) {
this.processId = processId.orElse(this.processId);
return this;
}
public Builder setUserEndpoint(StreamsGroupMemberMetadataValue.Endpoint userEndpoint) {
this.userEndpoint = Optional.ofNullable(userEndpoint);
return this;
}
public Builder maybeUpdateUserEndpoint(Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint) {
userEndpoint.ifPresent(this::setUserEndpoint);
return this;
}
public Builder setClientTags(Map<String, String> clientTags) {
this.clientTags = clientTags;
return this;
}
public Builder maybeUpdateClientTags(Optional<Map<String, String>> clientTags) {
this.clientTags = clientTags.orElse(this.clientTags);
return this;
}
public Builder setAssignedTasks(TasksTuple assignedTasks) {
this.assignedTasks = assignedTasks;
return this;
}
public Builder setTasksPendingRevocation(TasksTuple tasksPendingRevocation) {
this.tasksPendingRevocation = tasksPendingRevocation;
return this;
}
public Builder updateWith(StreamsGroupMemberMetadataValue record) {
setInstanceId(record.instanceId());
setRackId(record.rackId());
setClientId(record.clientId());
setClientHost(record.clientHost());
setRebalanceTimeoutMs(record.rebalanceTimeoutMs());
setTopologyEpoch(record.topologyEpoch());
setProcessId(record.processId());
setUserEndpoint(record.userEndpoint());
setClientTags(record.clientTags().stream().collect(Collectors.toMap(
StreamsGroupMemberMetadataValue.KeyValue::key,
StreamsGroupMemberMetadataValue.KeyValue::value
)));
return this;
}
public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue record) {
setMemberEpoch(record.memberEpoch());
setPreviousMemberEpoch(record.previousMemberEpoch());
setState(MemberState.fromValue(record.state()));
setAssignedTasks(
new TasksTuple(
assignmentFromTaskIds(record.activeTasks()),
assignmentFromTaskIds(record.standbyTasks()),
assignmentFromTaskIds(record.warmupTasks())
)
);
setTasksPendingRevocation(
new TasksTuple(
assignmentFromTaskIds(record.activeTasksPendingRevocation()),
assignmentFromTaskIds(record.standbyTasksPendingRevocation()),
assignmentFromTaskIds(record.warmupTasksPendingRevocation())
)
);
return this;
}
private static Map<String, Set<Integer>> assignmentFromTaskIds(
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> topicPartitionsList
) {
return topicPartitionsList.stream().collect(Collectors.toMap(
StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId,
taskIds -> Set.copyOf(taskIds.partitions())));
}
public static Builder withDefaults(String memberId) {
return new Builder(memberId)
.setRebalanceTimeoutMs(-1)
.setTopologyEpoch(-1)
.setInstanceId(null)
.setRackId(null)
.setProcessId("")
.setClientTags(Collections.emptyMap())
.setState(MemberState.STABLE)
.setMemberEpoch(0)
.setAssignedTasks(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setUserEndpoint(null);
}
public StreamsGroupMember build() {
return new StreamsGroupMember(
memberId,
memberEpoch,
previousMemberEpoch,
state,
instanceId,
rackId,
clientId,
clientHost,
rebalanceTimeoutMs,
topologyEpoch,
processId,
userEndpoint,
clientTags,
assignedTasks,
tasksPendingRevocation
);
}
}
/**
* @return True if the member is in the Stable state and at the desired epoch.
*/
public boolean isReconciledTo(int targetAssignmentEpoch) {
return state == MemberState.STABLE && memberEpoch == targetAssignmentEpoch;
}
/**
* Creates a member description for the Streams group describe response from this member.
*
* @param targetAssignment The target assignment of this member in the corresponding group.
*
* @return The StreamsGroupMember mapped as StreamsGroupDescribeResponseData.Member.
*/
public StreamsGroupDescribeResponseData.Member asStreamsGroupDescribeMember(TasksTuple targetAssignment) {
final StreamsGroupDescribeResponseData.Assignment describedTargetAssignment =
new StreamsGroupDescribeResponseData.Assignment();
if (targetAssignment != null) {
describedTargetAssignment
.setActiveTasks(taskIdsFromMap(targetAssignment.activeTasks()))
.setStandbyTasks(taskIdsFromMap(targetAssignment.standbyTasks()))
.setWarmupTasks(taskIdsFromMap(targetAssignment.warmupTasks()));
}
return new StreamsGroupDescribeResponseData.Member()
.setMemberEpoch(memberEpoch)
.setMemberId(memberId)
.setAssignment(
new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(taskIdsFromMap(assignedTasks.activeTasks()))
.setStandbyTasks(taskIdsFromMap(assignedTasks.standbyTasks()))
.setWarmupTasks(taskIdsFromMap(assignedTasks.warmupTasks())))
.setTargetAssignment(describedTargetAssignment)
.setClientHost(clientHost)
.setClientId(clientId)
.setInstanceId(instanceId.orElse(null))
.setRackId(rackId.orElse(null))
.setClientTags(clientTags.entrySet().stream().map(
entry -> new StreamsGroupDescribeResponseData.KeyValue()
.setKey(entry.getKey())
.setValue(entry.getValue())
).toList())
.setProcessId(processId)
.setTopologyEpoch(topologyEpoch)
.setUserEndpoint(
userEndpoint.map(
endpoint -> new StreamsGroupDescribeResponseData.Endpoint()
.setHost(endpoint.host())
.setPort(endpoint.port())
).orElse(null)
);
}
private static List<StreamsGroupDescribeResponseData.TaskIds> taskIdsFromMap(Map<String, Set<Integer>> tasks) {
List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new ArrayList<>();
tasks.keySet().stream().sorted().forEach(subtopologyId -> {
taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(subtopologyId)
.setPartitions(tasks.get(subtopologyId).stream().sorted().toList()));
});
return taskIds;
}
/**
* @return True if the two provided members have different assigned tasks.
*/
public static boolean hasAssignedTasksChanged(StreamsGroupMember member1, StreamsGroupMember member2) {
return !member1.assignedTasks().equals(member2.assignedTasks());
}
}