private void checkPartitionChanges()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java [210:244]


    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable throwable) {
        if (throwable != null) {
            throw new FlinkRuntimeException(
                    "Failed to list subscribed topic partitions due to: " + throwable.getMessage(),
                    throwable);
        }

        // Append the partitions into current assignment state twice,
        // because the getSubscribedTopicPartitions method is executed in another thread.
        List<TopicPartition> newPartitions =
                splitAssigner.registerTopicPartitions(fetchedPartitions);

        // Create subscription on newly discovered topic partitions if it doesn't contain related
        // subscription.
        for (TopicPartition partition : newPartitions) {
            String topic = partition.getFullTopicName();
            String subscriptionName = sourceConfiguration.getSubscriptionName();
            CursorPosition position =
                    startCursor.position(partition.getTopic(), partition.getPartitionId());

            try {
                if (sourceConfiguration.isResetSubscriptionCursor()) {
                    position.seekPosition(pulsarAdmin, topic, subscriptionName);
                } else {
                    position.createInitialPosition(pulsarAdmin, topic, subscriptionName);
                }
            } catch (PulsarAdminException e) {
                throw new FlinkRuntimeException(e);
            }
        }

        // Assign the new readers.
        List<Integer> registeredReaders = new ArrayList<>(context.registeredReaders().keySet());
        assignPendingPartitionSplits(registeredReaders);
    }