in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java [189:295]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
// handle single row filter scan
if (singleRowFilter != null || limit > 0 || selectRowCount) {
Collection<RowData> results;
if (singleRowFilter != null) {
results =
PushdownUtils.querySingleRow(
singleRowFilter,
tablePath,
flussConfig,
tableOutputType,
primaryKeyIndexes,
lookupMaxRetryTimes,
projectedFields);
} else if (limit > 0) {
results =
PushdownUtils.limitScan(
tablePath, flussConfig, tableOutputType, projectedFields, limit);
} else {
results =
Collections.singleton(
GenericRowData.of(
PushdownUtils.countLogTable(tablePath, flussConfig)));
}
TypeInformation<RowData> resultTypeInfo =
scanContext.createTypeInformation(producedDataType);
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
return execEnv.fromCollection(results, resultTypeInfo);
}
@Override
public boolean isBounded() {
return true;
}
};
}
// handle normal scan
RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType);
if (projectedFields != null) {
flussRowType = flussRowType.project(projectedFields);
}
OffsetsInitializer offsetsInitializer;
switch (startupOptions.startupMode) {
case EARLIEST:
offsetsInitializer = OffsetsInitializer.earliest();
break;
case LATEST:
offsetsInitializer = OffsetsInitializer.latest();
break;
case FULL:
offsetsInitializer = OffsetsInitializer.full();
break;
case TIMESTAMP:
offsetsInitializer =
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
break;
default:
throw new IllegalArgumentException(
"Unsupported startup mode: " + startupOptions.startupMode);
}
FlinkSource<RowData> source =
new FlinkSource<>(
flussConfig,
tablePath,
hasPrimaryKey(),
isPartitioned(),
flussRowType,
projectedFields,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
new RowDataDeserializationSchema(),
streaming);
if (!streaming) {
// return a bounded source provide to make planner happy,
// but this should throw exception when used to create source
return new SourceProvider() {
@Override
public boolean isBounded() {
return true;
}
@Override
public Source<RowData, ?, ?> createSource() {
if (modificationScanType != null) {
throw new UnsupportedOperationException(
"Currently, Fluss table only supports "
+ modificationScanType
+ " statement with conditions on primary key.");
}
if (!isDataLakeEnabled) {
throw new UnsupportedOperationException(
"Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode.");
}
return source;
}
};
} else {
return SourceProvider.of(source);
}
}