in flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java [193:421]
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
switch (rankType) {
case ROW_NUMBER:
break;
case RANK:
throw new TableException("RANK() on streaming table is not supported currently");
case DENSE_RANK:
throw new TableException(
"DENSE_RANK() on streaming table is not supported currently");
default:
throw new TableException(
String.format(
"Streaming tables do not support %s rank function.", rankType));
}
ExecEdge inputEdge = getInputEdges().get(0);
Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
RowType inputType = (RowType) inputEdge.getOutputType();
InternalTypeInfo<RowData> inputRowTypeInfo = InternalTypeInfo.of(inputType);
int[] sortFields = sortSpec.getFieldIndices();
RowDataKeySelector sortKeySelector =
KeySelectorUtil.getRowDataSelector(
planner.getFlinkContext().getClassLoader(), sortFields, inputRowTypeInfo);
// create a sort spec on sort keys.
int[] sortKeyPositions = IntStream.range(0, sortFields.length).toArray();
SortSpec.SortSpecBuilder builder = SortSpec.builder();
IntStream.range(0, sortFields.length)
.forEach(
idx ->
builder.addField(
idx,
sortSpec.getFieldSpec(idx).getIsAscendingOrder(),
sortSpec.getFieldSpec(idx).getNullIsLast()));
SortSpec sortSpecInSortKey = builder.build();
GeneratedRecordComparator sortKeyComparator =
ComparatorCodeGenerator.gen(
config,
planner.getFlinkContext().getClassLoader(),
"StreamExecSortComparator",
RowType.of(sortSpec.getFieldTypes(inputType)),
sortSpecInSortKey);
long cacheSize = config.get(TABLE_EXEC_RANK_TOPN_CACHE_SIZE);
long stateRetentionTime =
StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList);
StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(stateRetentionTime);
boolean isAsyncStateEnabled =
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED);
AbstractTopNFunction processFunction;
if (rankStrategy instanceof RankProcessStrategy.AppendFastStrategy) {
if (sortFields.length == 1
&& TypeCheckUtils.isProcTime(inputType.getChildren().get(sortFields[0]))
&& sortSpec.getFieldSpec(0).getIsAscendingOrder()) {
processFunction =
new AppendOnlyFirstNFunction(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber);
} else if (RankUtil.isTop1(rankRange)) {
if (isAsyncStateEnabled) {
processFunction =
new AsyncStateFastTop1Function(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
} else {
processFunction =
new FastTop1Function(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}
} else {
if (isAsyncStateEnabled) {
processFunction =
new AsyncStateAppendOnlyTopNFunction(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
} else {
processFunction =
new AppendOnlyTopNFunction(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}
}
} else if (rankStrategy instanceof RankProcessStrategy.UpdateFastStrategy) {
if (RankUtil.isTop1(rankRange)) {
if (isAsyncStateEnabled) {
processFunction =
new AsyncStateFastTop1Function(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
} else {
processFunction =
new FastTop1Function(
ttlConfig,
inputRowTypeInfo,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}
} else {
RankProcessStrategy.UpdateFastStrategy updateFastStrategy =
(RankProcessStrategy.UpdateFastStrategy) rankStrategy;
int[] primaryKeys = updateFastStrategy.getPrimaryKeys();
RowDataKeySelector rowKeySelector =
KeySelectorUtil.getRowDataSelector(
planner.getFlinkContext().getClassLoader(),
primaryKeys,
inputRowTypeInfo);
processFunction =
new UpdatableTopNFunction(
ttlConfig,
inputRowTypeInfo,
rowKeySelector,
sortKeyComparator,
sortKeySelector,
rankType,
rankRange,
generateUpdateBefore,
outputRankNumber,
cacheSize);
}
} else if (rankStrategy instanceof RankProcessStrategy.RetractStrategy) {
EqualiserCodeGenerator equaliserCodeGen =
new EqualiserCodeGenerator(
inputType.getFields().stream()
.map(RowType.RowField::getType)
.toArray(LogicalType[]::new),
planner.getFlinkContext().getClassLoader());
GeneratedRecordEqualiser generatedEqualiser =
equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser");
ComparableRecordComparator comparator =
new ComparableRecordComparator(
sortKeyComparator,
sortKeyPositions,
sortSpec.getFieldTypes(inputType),
sortSpec.getAscendingOrders(),
sortSpec.getNullsIsLast());
processFunction =
new RetractableTopNFunction(
ttlConfig,
inputRowTypeInfo,
comparator,
sortKeySelector,
rankType,
rankRange,
generatedEqualiser,
generateUpdateBefore,
outputRankNumber);
} else {
throw new TableException(
String.format("rank strategy:%s is not supported.", rankStrategy));
}
StreamOperator<RowData> operator;
if (processFunction instanceof AbstractAsyncStateTopNFunction) {
operator = new AsyncKeyedProcessOperator<>(processFunction);
processFunction.setKeyContext(operator);
} else {
operator = new KeyedProcessOperator<>(processFunction);
processFunction.setKeyContext(operator);
}
OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
createTransformationMeta(RANK_TRANSFORMATION, config),
operator,
InternalTypeInfo.of((RowType) getOutputType()),
inputTransform.getParallelism(),
false);
// set KeyType and Selector for state
RowDataKeySelector selector =
KeySelectorUtil.getRowDataSelector(
planner.getFlinkContext().getClassLoader(),
partitionSpec.getFieldIndices(),
inputRowTypeInfo);
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());
return transform;
}