public void handleSplitsChanges()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java [173:245]


    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges) {
        LOG.debug("Handle split changes {}", splitsChanges);

        // Get all the partition assignments and stopping offsets.
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChanges.getClass()));
        }

        if (registeredSplit != null) {
            throw new IllegalStateException("This split reader have assigned split.");
        }

        List<PulsarPartitionSplit> newSplits = splitsChanges.splits();
        Preconditions.checkArgument(
                newSplits.size() == 1, "This pulsar split reader only supports one split.");
        this.registeredSplit = newSplits.get(0);

        // Open stop cursor.
        try {
            registeredSplit.open(pulsarAdmin);
        } catch (Exception e) {
            throw new FlinkRuntimeException(e);
        }

        // Reset the start position before creating the consumer.
        MessageId latestConsumedId = registeredSplit.getLatestConsumedId();

        if (latestConsumedId != null) {
            LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
            try {
                CursorPosition cursorPosition;
                if (latestConsumedId == MessageId.latest
                        || latestConsumedId == MessageId.earliest) {
                    // for compatibility
                    cursorPosition = new CursorPosition(latestConsumedId, true);
                } else {
                    cursorPosition = new CursorPosition(latestConsumedId, false);
                }

                String topicName = registeredSplit.getPartition().getFullTopicName();
                String subscriptionName = sourceConfiguration.getSubscriptionName();

                // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
                // See https://github.com/apache/pulsar/issues/16757 for more details.

                cursorPosition.seekPosition(pulsarAdmin, topicName, subscriptionName);
            } catch (PulsarAdminException e) {
                if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
                    throw new IllegalArgumentException(e);
                } else {
                    // WARN_ON_MISMATCH would just print this warning message.
                    // No need to print the stacktrace.
                    LOG.warn(
                            "Failed to reset cursor to {} on partition {}",
                            latestConsumedId,
                            registeredSplit.getPartition(),
                            e);
                }
            }
        }

        // Create pulsar consumer.
        try {
            this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition());
        } catch (PulsarClientException e) {
            throw new FlinkRuntimeException(e);
        }

        LOG.info("Register split {} consumer for current reader.", registeredSplit);
    }