public static DataStream sortStreamByKey()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java [84:216]


    public static <KEY> DataStream<RowData> sortStreamByKey(
            final DataStream<RowData> inputStream,
            final FileStoreTable table,
            final RowType sortKeyType,
            final TypeInformation<KEY> keyTypeInformation,
            final SerializableSupplier<Comparator<KEY>> shuffleKeyComparator,
            final KeyAbstract<KEY> shuffleKeyAbstract,
            final ShuffleKeyConvertor<KEY> convertor,
            final TableSortInfo tableSortInfo) {

        final RowType valueRowType = table.rowType();
        CoreOptions options = table.coreOptions();
        final int sinkParallelism = tableSortInfo.getSinkParallelism();
        final int localSampleSize = tableSortInfo.getLocalSampleSize();
        final int globalSampleSize = tableSortInfo.getGlobalSampleSize();
        final int rangeNum = tableSortInfo.getRangeNumber();
        int keyFieldCount = sortKeyType.getFieldCount();
        int valueFieldCount = valueRowType.getFieldCount();
        final int[] valueProjectionMap = new int[valueFieldCount];
        for (int i = 0; i < valueFieldCount; i++) {
            valueProjectionMap[i] = i + keyFieldCount;
        }

        List<DataField> keyFields = sortKeyType.getFields();
        List<DataField> dataFields = valueRowType.getFields();

        List<DataField> fields = new ArrayList<>();
        fields.addAll(keyFields);
        fields.addAll(dataFields);
        final RowType longRowType = new RowType(fields);
        final InternalTypeInfo<InternalRow> internalRowType =
                InternalTypeInfo.fromRowType(longRowType);

        // generate the KEY as the key of Pair.
        DataStream<Tuple2<KEY, RowData>> inputWithKey =
                inputStream
                        .map(
                                new RichMapFunction<RowData, Tuple2<KEY, RowData>>() {

                                    /**
                                     * Do not annotate with <code>@override</code> here to maintain
                                     * compatibility with Flink 1.18-.
                                     */
                                    public void open(OpenContext openContext) throws Exception {
                                        open(new Configuration());
                                    }

                                    /**
                                     * Do not annotate with <code>@override</code> here to maintain
                                     * compatibility with Flink 2.0+.
                                     */
                                    public void open(Configuration parameters) throws Exception {
                                        shuffleKeyAbstract.open();
                                    }

                                    @Override
                                    public Tuple2<KEY, RowData> map(RowData value) {
                                        return Tuple2.of(shuffleKeyAbstract.apply(value), value);
                                    }
                                },
                                new TupleTypeInfo<>(keyTypeInformation, inputStream.getType()))
                        .setParallelism(inputStream.getParallelism());

        // range shuffle by key
        DataStream<Tuple2<KEY, RowData>> rangeShuffleResult =
                RangeShuffle.rangeShuffleByKey(
                        inputWithKey,
                        shuffleKeyComparator,
                        keyTypeInformation,
                        localSampleSize,
                        globalSampleSize,
                        rangeNum,
                        sinkParallelism,
                        valueRowType,
                        options.sortBySize());
        if (tableSortInfo.isSortInCluster()) {
            return rangeShuffleResult
                    .map(
                            a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)),
                            internalRowType)
                    .setParallelism(sinkParallelism)
                    // sort the output locally by `SortOperator`
                    .transform(
                            "LOCAL SORT",
                            internalRowType,
                            new SortOperator(
                                    sortKeyType,
                                    longRowType,
                                    options.writeBufferSize(),
                                    options.pageSize(),
                                    options.localSortMaxNumFileHandles(),
                                    options.spillCompressOptions(),
                                    sinkParallelism,
                                    options.writeBufferSpillDiskSize(),
                                    options.sequenceFieldSortOrderIsAscending()))
                    .setParallelism(sinkParallelism)
                    // remove the key column from every row
                    .map(
                            new RichMapFunction<InternalRow, InternalRow>() {

                                private transient KeyProjectedRow keyProjectedRow;

                                /**
                                 * Do not annotate with <code>@override</code> here to maintain
                                 * compatibility with Flink 1.18-.
                                 */
                                public void open(OpenContext openContext) {
                                    open(new Configuration());
                                }

                                /**
                                 * Do not annotate with <code>@override</code> here to maintain
                                 * compatibility with Flink 2.0+.
                                 */
                                public void open(Configuration parameters) {
                                    keyProjectedRow = new KeyProjectedRow(valueProjectionMap);
                                }

                                @Override
                                public InternalRow map(InternalRow value) {
                                    return keyProjectedRow.replaceRow(value);
                                }
                            },
                            InternalTypeInfo.fromRowType(valueRowType))
                    .setParallelism(sinkParallelism)
                    .map(FlinkRowData::new, inputStream.getType())
                    .setParallelism(sinkParallelism);
        } else {
            return rangeShuffleResult
                    .transform("REMOVE KEY", inputStream.getType(), new RemoveKeyOperator<>())
                    .setParallelism(sinkParallelism);
        }
    }