in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java [84:216]
public static <KEY> DataStream<RowData> sortStreamByKey(
final DataStream<RowData> inputStream,
final FileStoreTable table,
final RowType sortKeyType,
final TypeInformation<KEY> keyTypeInformation,
final SerializableSupplier<Comparator<KEY>> shuffleKeyComparator,
final KeyAbstract<KEY> shuffleKeyAbstract,
final ShuffleKeyConvertor<KEY> convertor,
final TableSortInfo tableSortInfo) {
final RowType valueRowType = table.rowType();
CoreOptions options = table.coreOptions();
final int sinkParallelism = tableSortInfo.getSinkParallelism();
final int localSampleSize = tableSortInfo.getLocalSampleSize();
final int globalSampleSize = tableSortInfo.getGlobalSampleSize();
final int rangeNum = tableSortInfo.getRangeNumber();
int keyFieldCount = sortKeyType.getFieldCount();
int valueFieldCount = valueRowType.getFieldCount();
final int[] valueProjectionMap = new int[valueFieldCount];
for (int i = 0; i < valueFieldCount; i++) {
valueProjectionMap[i] = i + keyFieldCount;
}
List<DataField> keyFields = sortKeyType.getFields();
List<DataField> dataFields = valueRowType.getFields();
List<DataField> fields = new ArrayList<>();
fields.addAll(keyFields);
fields.addAll(dataFields);
final RowType longRowType = new RowType(fields);
final InternalTypeInfo<InternalRow> internalRowType =
InternalTypeInfo.fromRowType(longRowType);
// generate the KEY as the key of Pair.
DataStream<Tuple2<KEY, RowData>> inputWithKey =
inputStream
.map(
new RichMapFunction<RowData, Tuple2<KEY, RowData>>() {
/**
* Do not annotate with <code>@override</code> here to maintain
* compatibility with Flink 1.18-.
*/
public void open(OpenContext openContext) throws Exception {
open(new Configuration());
}
/**
* Do not annotate with <code>@override</code> here to maintain
* compatibility with Flink 2.0+.
*/
public void open(Configuration parameters) throws Exception {
shuffleKeyAbstract.open();
}
@Override
public Tuple2<KEY, RowData> map(RowData value) {
return Tuple2.of(shuffleKeyAbstract.apply(value), value);
}
},
new TupleTypeInfo<>(keyTypeInformation, inputStream.getType()))
.setParallelism(inputStream.getParallelism());
// range shuffle by key
DataStream<Tuple2<KEY, RowData>> rangeShuffleResult =
RangeShuffle.rangeShuffleByKey(
inputWithKey,
shuffleKeyComparator,
keyTypeInformation,
localSampleSize,
globalSampleSize,
rangeNum,
sinkParallelism,
valueRowType,
options.sortBySize());
if (tableSortInfo.isSortInCluster()) {
return rangeShuffleResult
.map(
a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)),
internalRowType)
.setParallelism(sinkParallelism)
// sort the output locally by `SortOperator`
.transform(
"LOCAL SORT",
internalRowType,
new SortOperator(
sortKeyType,
longRowType,
options.writeBufferSize(),
options.pageSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
sinkParallelism,
options.writeBufferSpillDiskSize(),
options.sequenceFieldSortOrderIsAscending()))
.setParallelism(sinkParallelism)
// remove the key column from every row
.map(
new RichMapFunction<InternalRow, InternalRow>() {
private transient KeyProjectedRow keyProjectedRow;
/**
* Do not annotate with <code>@override</code> here to maintain
* compatibility with Flink 1.18-.
*/
public void open(OpenContext openContext) {
open(new Configuration());
}
/**
* Do not annotate with <code>@override</code> here to maintain
* compatibility with Flink 2.0+.
*/
public void open(Configuration parameters) {
keyProjectedRow = new KeyProjectedRow(valueProjectionMap);
}
@Override
public InternalRow map(InternalRow value) {
return keyProjectedRow.replaceRow(value);
}
},
InternalTypeInfo.fromRowType(valueRowType))
.setParallelism(sinkParallelism)
.map(FlinkRowData::new, inputStream.getType())
.setParallelism(sinkParallelism);
} else {
return rangeShuffleResult
.transform("REMOVE KEY", inputStream.getType(), new RemoveKeyOperator<>())
.setParallelism(sinkParallelism);
}
}