public static Collection limitScan()

in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/PushdownUtils.java [295:365]


    public static Collection<RowData> limitScan(
            TablePath tablePath,
            Configuration flussConfig,
            RowType sourceOutputType,
            @Nullable int[] projectedFields,
            long limitRowNum) {
        if (limitRowNum > MAX_LIMIT_PUSHDOWN) {
            throw new UnsupportedOperationException(
                    String.format(
                            "LIMIT statement doesn't support greater than %s", MAX_LIMIT_PUSHDOWN));
        }
        int limit = (int) limitRowNum;
        try (Connection connection = ConnectionFactory.createConnection(flussConfig);
                Table table = connection.getTable(tablePath);
                Admin flussAdmin = connection.getAdmin()) {
            TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
            int bucketCount = tableInfo.getNumBuckets();
            List<TableBucket> tableBuckets;
            if (tableInfo.isPartitioned()) {
                List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
                tableBuckets =
                        partitionInfos.stream()
                                .flatMap(
                                        partitionInfo ->
                                                IntStream.range(0, bucketCount)
                                                        .mapToObj(
                                                                bucketId ->
                                                                        new TableBucket(
                                                                                tableInfo
                                                                                        .getTableId(),
                                                                                partitionInfo
                                                                                        .getPartitionId(),
                                                                                bucketId)))
                                .collect(Collectors.toList());
            } else {
                tableBuckets =
                        IntStream.range(0, bucketCount)
                                .mapToObj(
                                        bucketId ->
                                                new TableBucket(tableInfo.getTableId(), bucketId))
                                .collect(Collectors.toList());
            }

            Scan scan = table.newScan().limit(limit).project(projectedFields);
            List<BatchScanner> scanners =
                    tableBuckets.stream()
                            .map(scan::createBatchScanner)
                            .collect(Collectors.toList());
            List<InternalRow> scannedRows = BatchScanUtils.collectLimitedRows(scanners, limit);

            // convert fluss row into flink row
            List<RowData> flinkRows = new ArrayList<>();
            FlussRowToFlinkRowConverter flussRowToFlinkRowConverter =
                    new FlussRowToFlinkRowConverter(
                            projectedFields != null
                                    ? FlinkConversions.toFlussRowType(sourceOutputType)
                                            .project(projectedFields)
                                    : FlinkConversions.toFlussRowType(sourceOutputType));
            int count = 0;
            for (InternalRow row : scannedRows) {
                flinkRows.add(flussRowToFlinkRowConverter.toFlinkRowData(row));
                if (++count >= limit) {
                    break;
                }
            }

            return flinkRows;
        } catch (Exception e) {
            throw new FlussRuntimeException(e);
        }
    }