in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java [124:152]
public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {
if (logger.isInfoEnabled()) {
logger.info(
"Assigning {} splits for cluster {}: {}",
newSplitAssignments.assignment().values().stream()
.mapToLong(Collection::size)
.sum(),
kafkaClusterId,
newSplitAssignments);
}
Map<Integer, List<DynamicKafkaSourceSplit>> readerToSplitsMap = new HashMap<>();
newSplitAssignments
.assignment()
.forEach(
(subtask, splits) ->
readerToSplitsMap.put(
subtask,
splits.stream()
.map(
split ->
new DynamicKafkaSourceSplit(
kafkaClusterId, split))
.collect(Collectors.toList())));
if (!readerToSplitsMap.isEmpty()) {
enumContext.assignSplits(new SplitsAssignment<>(readerToSplitsMap));
}
}