in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java [173:245]
public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges) {
LOG.debug("Handle split changes {}", splitsChanges);
// Get all the partition assignments and stopping offsets.
if (!(splitsChanges instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChanges.getClass()));
}
if (registeredSplit != null) {
throw new IllegalStateException("This split reader have assigned split.");
}
List<PulsarPartitionSplit> newSplits = splitsChanges.splits();
Preconditions.checkArgument(
newSplits.size() == 1, "This pulsar split reader only supports one split.");
this.registeredSplit = newSplits.get(0);
// Open stop cursor.
try {
registeredSplit.open(pulsarAdmin);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
// Reset the start position before creating the consumer.
MessageId latestConsumedId = registeredSplit.getLatestConsumedId();
if (latestConsumedId != null) {
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
try {
CursorPosition cursorPosition;
if (latestConsumedId == MessageId.latest
|| latestConsumedId == MessageId.earliest) {
// for compatibility
cursorPosition = new CursorPosition(latestConsumedId, true);
} else {
cursorPosition = new CursorPosition(latestConsumedId, false);
}
String topicName = registeredSplit.getPartition().getFullTopicName();
String subscriptionName = sourceConfiguration.getSubscriptionName();
// Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
// See https://github.com/apache/pulsar/issues/16757 for more details.
cursorPosition.seekPosition(pulsarAdmin, topicName, subscriptionName);
} catch (PulsarAdminException e) {
if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
throw new IllegalArgumentException(e);
} else {
// WARN_ON_MISMATCH would just print this warning message.
// No need to print the stacktrace.
LOG.warn(
"Failed to reset cursor to {} on partition {}",
latestConsumedId,
registeredSplit.getPartition(),
e);
}
}
}
// Create pulsar consumer.
try {
this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition());
} catch (PulsarClientException e) {
throw new FlinkRuntimeException(e);
}
LOG.info("Register split {} consumer for current reader.", registeredSplit);
}