protected StartingScanner createStartingScanner()

in paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java [79:164]


    protected StartingScanner createStartingScanner(boolean isStreaming) {
        CoreOptions.StreamingCompactionType type =
                options.toConfiguration().get(CoreOptions.STREAMING_COMPACT);
        switch (type) {
            case NORMAL:
                {
                    checkArgument(
                            isStreaming,
                            "Set 'streaming-compact' in batch mode. This is unexpected.");
                    return new ContinuousCompactorStartingScanner();
                }
            case BUCKET_UNAWARE:
                {
                    return new FullStartingScanner();
                }
        }

        // read from consumer id
        String consumerId = options.consumerId();
        if (consumerId != null) {
            ConsumerManager consumerManager = snapshotReader.consumerManager();
            Optional<Consumer> consumer = consumerManager.consumer(consumerId);
            if (consumer.isPresent()) {
                return new ContinuousFromSnapshotStartingScanner(consumer.get().nextSnapshot());
            }
        }

        CoreOptions.StartupMode startupMode = options.startupMode();
        switch (startupMode) {
            case LATEST_FULL:
                return new FullStartingScanner();
            case LATEST:
                return isStreaming
                        ? new ContinuousLatestStartingScanner()
                        : new FullStartingScanner();
            case COMPACTED_FULL:
                if (options.changelogProducer() == ChangelogProducer.FULL_COMPACTION
                        || options.toConfiguration().contains(FULL_COMPACTION_DELTA_COMMITS)) {
                    int deltaCommits =
                            options.toConfiguration()
                                    .getOptional(FULL_COMPACTION_DELTA_COMMITS)
                                    .orElse(1);
                    return new FullCompactedStartingScanner(deltaCommits);
                } else {
                    return new CompactedStartingScanner();
                }
            case FROM_TIMESTAMP:
                Long startupMillis = options.scanTimestampMills();
                return isStreaming
                        ? new ContinuousFromTimestampStartingScanner(startupMillis)
                        : new StaticFromTimestampStartingScanner(startupMillis);
            case FROM_SNAPSHOT:
                if (options.scanSnapshotId() != null) {
                    return isStreaming
                            ? new ContinuousFromSnapshotStartingScanner(options.scanSnapshotId())
                            : new StaticFromSnapshotStartingScanner(options.scanSnapshotId());
                } else {
                    checkArgument(!isStreaming, "Cannot scan from tag in streaming mode.");
                    return new StaticFromTagStartingScanner(options().scanTagName());
                }
            case FROM_SNAPSHOT_FULL:
                return isStreaming
                        ? new ContinuousFromSnapshotFullStartingScanner(options.scanSnapshotId())
                        : new StaticFromSnapshotStartingScanner(options.scanSnapshotId());
            case INCREMENTAL:
                checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
                Pair<String, String> incrementalBetween = options.incrementalBetween();
                if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) != null) {
                    try {
                        return new IncrementalStartingScanner(
                                Long.parseLong(incrementalBetween.getLeft()),
                                Long.parseLong(incrementalBetween.getRight()));
                    } catch (NumberFormatException e) {
                        return new IncrementalTagStartingScanner(
                                incrementalBetween.getLeft(), incrementalBetween.getRight());
                    }
                } else {
                    return new IncrementalTimeStampStartingScanner(
                            Long.parseLong(incrementalBetween.getLeft()),
                            Long.parseLong(incrementalBetween.getRight()));
                }
            default:
                throw new UnsupportedOperationException(
                        "Unknown startup mode " + startupMode.name());
        }
    }