in paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java [79:164]
protected StartingScanner createStartingScanner(boolean isStreaming) {
CoreOptions.StreamingCompactionType type =
options.toConfiguration().get(CoreOptions.STREAMING_COMPACT);
switch (type) {
case NORMAL:
{
checkArgument(
isStreaming,
"Set 'streaming-compact' in batch mode. This is unexpected.");
return new ContinuousCompactorStartingScanner();
}
case BUCKET_UNAWARE:
{
return new FullStartingScanner();
}
}
// read from consumer id
String consumerId = options.consumerId();
if (consumerId != null) {
ConsumerManager consumerManager = snapshotReader.consumerManager();
Optional<Consumer> consumer = consumerManager.consumer(consumerId);
if (consumer.isPresent()) {
return new ContinuousFromSnapshotStartingScanner(consumer.get().nextSnapshot());
}
}
CoreOptions.StartupMode startupMode = options.startupMode();
switch (startupMode) {
case LATEST_FULL:
return new FullStartingScanner();
case LATEST:
return isStreaming
? new ContinuousLatestStartingScanner()
: new FullStartingScanner();
case COMPACTED_FULL:
if (options.changelogProducer() == ChangelogProducer.FULL_COMPACTION
|| options.toConfiguration().contains(FULL_COMPACTION_DELTA_COMMITS)) {
int deltaCommits =
options.toConfiguration()
.getOptional(FULL_COMPACTION_DELTA_COMMITS)
.orElse(1);
return new FullCompactedStartingScanner(deltaCommits);
} else {
return new CompactedStartingScanner();
}
case FROM_TIMESTAMP:
Long startupMillis = options.scanTimestampMills();
return isStreaming
? new ContinuousFromTimestampStartingScanner(startupMillis)
: new StaticFromTimestampStartingScanner(startupMillis);
case FROM_SNAPSHOT:
if (options.scanSnapshotId() != null) {
return isStreaming
? new ContinuousFromSnapshotStartingScanner(options.scanSnapshotId())
: new StaticFromSnapshotStartingScanner(options.scanSnapshotId());
} else {
checkArgument(!isStreaming, "Cannot scan from tag in streaming mode.");
return new StaticFromTagStartingScanner(options().scanTagName());
}
case FROM_SNAPSHOT_FULL:
return isStreaming
? new ContinuousFromSnapshotFullStartingScanner(options.scanSnapshotId())
: new StaticFromSnapshotStartingScanner(options.scanSnapshotId());
case INCREMENTAL:
checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
Pair<String, String> incrementalBetween = options.incrementalBetween();
if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) != null) {
try {
return new IncrementalStartingScanner(
Long.parseLong(incrementalBetween.getLeft()),
Long.parseLong(incrementalBetween.getRight()));
} catch (NumberFormatException e) {
return new IncrementalTagStartingScanner(
incrementalBetween.getLeft(), incrementalBetween.getRight());
}
} else {
return new IncrementalTimeStampStartingScanner(
Long.parseLong(incrementalBetween.getLeft()),
Long.parseLong(incrementalBetween.getRight()));
}
default:
throw new UnsupportedOperationException(
"Unknown startup mode " + startupMode.name());
}
}