in paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java [131:227]
protected StartingScanner createStartingScanner(boolean isStreaming) {
SnapshotManager snapshotManager = snapshotReader.snapshotManager();
ChangelogManager changelogManager = snapshotReader.changelogManager();
CoreOptions.StreamScanMode type =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
switch (type) {
case COMPACT_BUCKET_TABLE:
checkArgument(
isStreaming, "Set 'streaming-compact' in batch mode. This is unexpected.");
return new ContinuousCompactorStartingScanner(snapshotManager);
case FILE_MONITOR:
return new FullStartingScanner(snapshotManager);
}
// read from consumer id
String consumerId = options.consumerId();
if (isStreaming && consumerId != null && !options.consumerIgnoreProgress()) {
ConsumerManager consumerManager = snapshotReader.consumerManager();
Optional<Consumer> consumer = consumerManager.consumer(consumerId);
if (consumer.isPresent()) {
return new ContinuousFromSnapshotStartingScanner(
snapshotManager,
changelogManager,
consumer.get().nextSnapshot(),
options.changelogLifecycleDecoupled());
}
}
CoreOptions.StartupMode startupMode = options.startupMode();
switch (startupMode) {
case LATEST_FULL:
return new FullStartingScanner(snapshotManager);
case LATEST:
return isStreaming
? new ContinuousLatestStartingScanner(snapshotManager)
: new FullStartingScanner(snapshotManager);
case COMPACTED_FULL:
if (options.changelogProducer() == ChangelogProducer.FULL_COMPACTION
|| options.toConfiguration().contains(FULL_COMPACTION_DELTA_COMMITS)) {
int deltaCommits =
options.toConfiguration()
.getOptional(FULL_COMPACTION_DELTA_COMMITS)
.orElse(1);
return new FullCompactedStartingScanner(snapshotManager, deltaCommits);
} else {
return new CompactedStartingScanner(snapshotManager);
}
case FROM_TIMESTAMP:
Long startupMillis = options.scanTimestampMills();
return isStreaming
? new ContinuousFromTimestampStartingScanner(
snapshotManager,
changelogManager,
startupMillis,
options.changelogLifecycleDecoupled())
: new StaticFromTimestampStartingScanner(snapshotManager, startupMillis);
case FROM_FILE_CREATION_TIME:
Long fileCreationTimeMills = options.scanFileCreationTimeMills();
return new FileCreationTimeStartingScanner(snapshotManager, fileCreationTimeMills);
case FROM_SNAPSHOT:
if (options.scanSnapshotId() != null) {
return isStreaming
? new ContinuousFromSnapshotStartingScanner(
snapshotManager,
changelogManager,
options.scanSnapshotId(),
options.changelogLifecycleDecoupled())
: new StaticFromSnapshotStartingScanner(
snapshotManager, options.scanSnapshotId());
} else if (options.scanWatermark() != null) {
checkArgument(!isStreaming, "Cannot scan from watermark in streaming mode.");
return new StaticFromWatermarkStartingScanner(
snapshotManager, options().scanWatermark());
} else if (options.scanTagName() != null) {
checkArgument(!isStreaming, "Cannot scan from tag in streaming mode.");
return new StaticFromTagStartingScanner(
snapshotManager, options().scanTagName());
} else {
throw new UnsupportedOperationException("Unknown snapshot read mode");
}
case FROM_SNAPSHOT_FULL:
Long scanSnapshotId = options.scanSnapshotId();
checkNotNull(
scanSnapshotId,
"scan.snapshot-id must be set when startupMode is FROM_SNAPSHOT_FULL.");
return isStreaming
? new ContinuousFromSnapshotFullStartingScanner(
snapshotManager, scanSnapshotId)
: new StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId);
case INCREMENTAL:
checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
return createIncrementalStartingScanner(snapshotManager);
default:
throw new UnsupportedOperationException(
"Unknown startup mode " + startupMode.name());
}
}