in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java [247:284]
private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
if (pendingReaders.isEmpty()) {
return;
}
// Validate the reader.
pendingReaders.forEach(
reader -> {
if (!context.registeredReaders().containsKey(reader)) {
throw new IllegalStateException(
"Reader " + reader + " is not registered to source coordinator");
}
});
// Assign splits to downstream readers.
splitAssigner
.createAssignment(pendingReaders)
.ifPresent(
assignments -> {
LOG.info(
"The split assignment results are: {}",
assignments.assignment());
context.assignSplits(assignments);
});
// If periodically partition discovery is turned off and the initializing discovery has done
// signal NoMoreSplitsEvent to pending readers.
for (Integer reader : pendingReaders) {
if (splitAssigner.noMoreSplits(reader)) {
LOG.debug(
"No more PulsarPartitionSplits to assign."
+ " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
reader,
sourceConfiguration.getSubscriptionDesc());
context.signalNoMoreSplits(reader);
}
}
}