in flink-connector/flink-connector-gcp-pubsub/src/main/java/com/google/pubsub/flink/internal/source/enumerator/PubSubSplitEnumerator.java [85:101]
private void checkForUnassignedReaders() {
// Remove all assignments for readers that are no longer registered.
Set<Integer> registeredReaders = context.registeredReaders().keySet();
readersWithAssignments.keySet().removeIf(task -> !registeredReaders.contains(task));
// For all readers without an assignment, assign a new Split.
HashMap<Integer, List<SubscriptionSplit>> newAssignments = new HashMap<>();
for (Integer reader : registeredReaders) {
if (!readersWithAssignments.containsKey(reader)) {
SubscriptionSplit newSplit =
SubscriptionSplit.create(subscriptionName, Integer.toString(reader));
readersWithAssignments.put(reader, newSplit);
newAssignments.put(reader, Collections.singletonList(newSplit));
}
}
context.assignSplits(new SplitsAssignment<>(newAssignments));
}