public ScanRuntimeProvider getScanRuntimeProvider()

in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java [189:295]


    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
        // handle single row filter scan
        if (singleRowFilter != null || limit > 0 || selectRowCount) {
            Collection<RowData> results;
            if (singleRowFilter != null) {
                results =
                        PushdownUtils.querySingleRow(
                                singleRowFilter,
                                tablePath,
                                flussConfig,
                                tableOutputType,
                                primaryKeyIndexes,
                                lookupMaxRetryTimes,
                                projectedFields);
            } else if (limit > 0) {
                results =
                        PushdownUtils.limitScan(
                                tablePath, flussConfig, tableOutputType, projectedFields, limit);
            } else {
                results =
                        Collections.singleton(
                                GenericRowData.of(
                                        PushdownUtils.countLogTable(tablePath, flussConfig)));
            }

            TypeInformation<RowData> resultTypeInfo =
                    scanContext.createTypeInformation(producedDataType);
            return new DataStreamScanProvider() {
                @Override
                public DataStream<RowData> produceDataStream(
                        ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
                    return execEnv.fromCollection(results, resultTypeInfo);
                }

                @Override
                public boolean isBounded() {
                    return true;
                }
            };
        }

        // handle normal scan
        RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType);
        if (projectedFields != null) {
            flussRowType = flussRowType.project(projectedFields);
        }
        OffsetsInitializer offsetsInitializer;
        switch (startupOptions.startupMode) {
            case EARLIEST:
                offsetsInitializer = OffsetsInitializer.earliest();
                break;
            case LATEST:
                offsetsInitializer = OffsetsInitializer.latest();
                break;
            case FULL:
                offsetsInitializer = OffsetsInitializer.full();
                break;
            case TIMESTAMP:
                offsetsInitializer =
                        OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
                break;
            default:
                throw new IllegalArgumentException(
                        "Unsupported startup mode: " + startupOptions.startupMode);
        }

        FlinkSource<RowData> source =
                new FlinkSource<>(
                        flussConfig,
                        tablePath,
                        hasPrimaryKey(),
                        isPartitioned(),
                        flussRowType,
                        projectedFields,
                        offsetsInitializer,
                        scanPartitionDiscoveryIntervalMs,
                        new RowDataDeserializationSchema(),
                        streaming);

        if (!streaming) {
            // return a bounded source provide to make planner happy,
            // but this should throw exception when used to create source
            return new SourceProvider() {
                @Override
                public boolean isBounded() {
                    return true;
                }

                @Override
                public Source<RowData, ?, ?> createSource() {
                    if (modificationScanType != null) {
                        throw new UnsupportedOperationException(
                                "Currently, Fluss table only supports "
                                        + modificationScanType
                                        + " statement with conditions on primary key.");
                    }
                    if (!isDataLakeEnabled) {
                        throw new UnsupportedOperationException(
                                "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode.");
                    }
                    return source;
                }
            };
        } else {
            return SourceProvider.of(source);
        }
    }