in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java [139:157]
protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSplitIds) {
// Close all the finished splits.
for (String splitId : finishedSplitIds.keySet()) {
((PulsarSourceFetcherManager) splitFetcherManager).closeFetcher(splitId);
}
// We don't require new splits, all the splits are pre-assigned by source enumerator.
if (LOG.isDebugEnabled()) {
LOG.debug("onSplitFinished event: {}", finishedSplitIds);
}
for (Map.Entry<String, PulsarPartitionSplitState> entry : finishedSplitIds.entrySet()) {
PulsarPartitionSplitState state = entry.getValue();
MessageId latestConsumedId = state.getLatestConsumedId();
if (latestConsumedId != null) {
cursorsOfFinishedSplits.put(state.getPartition(), latestConsumedId);
}
}
}