in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/PushdownUtils.java [295:365]
public static Collection<RowData> limitScan(
TablePath tablePath,
Configuration flussConfig,
RowType sourceOutputType,
@Nullable int[] projectedFields,
long limitRowNum) {
if (limitRowNum > MAX_LIMIT_PUSHDOWN) {
throw new UnsupportedOperationException(
String.format(
"LIMIT statement doesn't support greater than %s", MAX_LIMIT_PUSHDOWN));
}
int limit = (int) limitRowNum;
try (Connection connection = ConnectionFactory.createConnection(flussConfig);
Table table = connection.getTable(tablePath);
Admin flussAdmin = connection.getAdmin()) {
TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
int bucketCount = tableInfo.getNumBuckets();
List<TableBucket> tableBuckets;
if (tableInfo.isPartitioned()) {
List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
tableBuckets =
partitionInfos.stream()
.flatMap(
partitionInfo ->
IntStream.range(0, bucketCount)
.mapToObj(
bucketId ->
new TableBucket(
tableInfo
.getTableId(),
partitionInfo
.getPartitionId(),
bucketId)))
.collect(Collectors.toList());
} else {
tableBuckets =
IntStream.range(0, bucketCount)
.mapToObj(
bucketId ->
new TableBucket(tableInfo.getTableId(), bucketId))
.collect(Collectors.toList());
}
Scan scan = table.newScan().limit(limit).project(projectedFields);
List<BatchScanner> scanners =
tableBuckets.stream()
.map(scan::createBatchScanner)
.collect(Collectors.toList());
List<InternalRow> scannedRows = BatchScanUtils.collectLimitedRows(scanners, limit);
// convert fluss row into flink row
List<RowData> flinkRows = new ArrayList<>();
FlussRowToFlinkRowConverter flussRowToFlinkRowConverter =
new FlussRowToFlinkRowConverter(
projectedFields != null
? FlinkConversions.toFlussRowType(sourceOutputType)
.project(projectedFields)
: FlinkConversions.toFlussRowType(sourceOutputType));
int count = 0;
for (InternalRow row : scannedRows) {
flinkRows.add(flussRowToFlinkRowConverter.toFlinkRowData(row));
if (++count >= limit) {
break;
}
}
return flinkRows;
} catch (Exception e) {
throw new FlussRuntimeException(e);
}
}