private void checkForUnassignedReaders()

in flink-connector/flink-connector-gcp-pubsub/src/main/java/com/google/pubsub/flink/internal/source/enumerator/PubSubSplitEnumerator.java [85:101]


  private void checkForUnassignedReaders() {
    // Remove all assignments for readers that are no longer registered.
    Set<Integer> registeredReaders = context.registeredReaders().keySet();
    readersWithAssignments.keySet().removeIf(task -> !registeredReaders.contains(task));

    // For all readers without an assignment, assign a new Split.
    HashMap<Integer, List<SubscriptionSplit>> newAssignments = new HashMap<>();
    for (Integer reader : registeredReaders) {
      if (!readersWithAssignments.containsKey(reader)) {
        SubscriptionSplit newSplit =
            SubscriptionSplit.create(subscriptionName, Integer.toString(reader));
        readersWithAssignments.put(reader, newSplit);
        newAssignments.put(reader, Collections.singletonList(newSplit));
      }
    }
    context.assignSplits(new SplitsAssignment<>(newAssignments));
  }