private void assignPendingPartitionSplits()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java [247:284]


    private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
        if (pendingReaders.isEmpty()) {
            return;
        }

        // Validate the reader.
        pendingReaders.forEach(
                reader -> {
                    if (!context.registeredReaders().containsKey(reader)) {
                        throw new IllegalStateException(
                                "Reader " + reader + " is not registered to source coordinator");
                    }
                });

        // Assign splits to downstream readers.
        splitAssigner
                .createAssignment(pendingReaders)
                .ifPresent(
                        assignments -> {
                            LOG.info(
                                    "The split assignment results are: {}",
                                    assignments.assignment());
                            context.assignSplits(assignments);
                        });

        // If periodically partition discovery is turned off and the initializing discovery has done
        // signal NoMoreSplitsEvent to pending readers.
        for (Integer reader : pendingReaders) {
            if (splitAssigner.noMoreSplits(reader)) {
                LOG.debug(
                        "No more PulsarPartitionSplits to assign."
                                + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
                        reader,
                        sourceConfiguration.getSubscriptionDesc());
                context.signalNoMoreSplits(reader);
            }
        }
    }