in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java [356:398]
private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {
Map<Integer, List<KafkaPartitionSplit>> incrementalAssignment = new HashMap<>();
// Check if there's any pending splits for given readers
for (int pendingReader : pendingReaders) {
checkReaderRegistered(pendingReader);
// Remove pending assignment for the reader
final Set<KafkaPartitionSplit> pendingAssignmentForReader =
pendingPartitionSplitAssignment.remove(pendingReader);
if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
// Put pending assignment into incremental assignment
incrementalAssignment
.computeIfAbsent(pendingReader, (ignored) -> new ArrayList<>())
.addAll(pendingAssignmentForReader);
// Mark pending partitions as already assigned
pendingAssignmentForReader.forEach(
split -> {
assignedPartitions.add(split.getTopicPartition());
unassignedInitialPartitions.remove(split.getTopicPartition());
});
}
}
// Assign pending splits to readers
if (!incrementalAssignment.isEmpty()) {
LOG.info("Assigning splits to readers {}", incrementalAssignment);
context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
}
// If periodically partition discovery is disabled and the initializing discovery has done,
// signal NoMoreSplitsEvent to pending readers
if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) {
LOG.debug(
"No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}"
+ " in consumer group {}.",
pendingReaders,
consumerGroupId);
pendingReaders.forEach(context::signalNoMoreSplits);
}
}