protected Transformation translateToPlanInternal()

in flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java [193:421]


    protected Transformation<RowData> translateToPlanInternal(
            PlannerBase planner, ExecNodeConfig config) {
        switch (rankType) {
            case ROW_NUMBER:
                break;
            case RANK:
                throw new TableException("RANK() on streaming table is not supported currently");
            case DENSE_RANK:
                throw new TableException(
                        "DENSE_RANK() on streaming table is not supported currently");
            default:
                throw new TableException(
                        String.format(
                                "Streaming tables do not support %s rank function.", rankType));
        }

        ExecEdge inputEdge = getInputEdges().get(0);
        Transformation<RowData> inputTransform =
                (Transformation<RowData>) inputEdge.translateToPlan(planner);

        RowType inputType = (RowType) inputEdge.getOutputType();
        InternalTypeInfo<RowData> inputRowTypeInfo = InternalTypeInfo.of(inputType);
        int[] sortFields = sortSpec.getFieldIndices();
        RowDataKeySelector sortKeySelector =
                KeySelectorUtil.getRowDataSelector(
                        planner.getFlinkContext().getClassLoader(), sortFields, inputRowTypeInfo);
        // create a sort spec on sort keys.
        int[] sortKeyPositions = IntStream.range(0, sortFields.length).toArray();
        SortSpec.SortSpecBuilder builder = SortSpec.builder();
        IntStream.range(0, sortFields.length)
                .forEach(
                        idx ->
                                builder.addField(
                                        idx,
                                        sortSpec.getFieldSpec(idx).getIsAscendingOrder(),
                                        sortSpec.getFieldSpec(idx).getNullIsLast()));
        SortSpec sortSpecInSortKey = builder.build();
        GeneratedRecordComparator sortKeyComparator =
                ComparatorCodeGenerator.gen(
                        config,
                        planner.getFlinkContext().getClassLoader(),
                        "StreamExecSortComparator",
                        RowType.of(sortSpec.getFieldTypes(inputType)),
                        sortSpecInSortKey);
        long cacheSize = config.get(TABLE_EXEC_RANK_TOPN_CACHE_SIZE);

        long stateRetentionTime =
                StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList);
        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(stateRetentionTime);
        boolean isAsyncStateEnabled =
                config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED);

        AbstractTopNFunction processFunction;
        if (rankStrategy instanceof RankProcessStrategy.AppendFastStrategy) {
            if (sortFields.length == 1
                    && TypeCheckUtils.isProcTime(inputType.getChildren().get(sortFields[0]))
                    && sortSpec.getFieldSpec(0).getIsAscendingOrder()) {
                processFunction =
                        new AppendOnlyFirstNFunction(
                                ttlConfig,
                                inputRowTypeInfo,
                                sortKeyComparator,
                                sortKeySelector,
                                rankType,
                                rankRange,
                                generateUpdateBefore,
                                outputRankNumber);
            } else if (RankUtil.isTop1(rankRange)) {
                if (isAsyncStateEnabled) {
                    processFunction =
                            new AsyncStateFastTop1Function(
                                    ttlConfig,
                                    inputRowTypeInfo,
                                    sortKeyComparator,
                                    sortKeySelector,
                                    rankType,
                                    rankRange,
                                    generateUpdateBefore,
                                    outputRankNumber,
                                    cacheSize);
                } else {
                    processFunction =
                            new FastTop1Function(
                                    ttlConfig,
                                    inputRowTypeInfo,
                                    sortKeyComparator,
                                    sortKeySelector,
                                    rankType,
                                    rankRange,
                                    generateUpdateBefore,
                                    outputRankNumber,
                                    cacheSize);
                }
            } else {
                if (isAsyncStateEnabled) {
                    processFunction =
                            new AsyncStateAppendOnlyTopNFunction(
                                    ttlConfig,
                                    inputRowTypeInfo,
                                    sortKeyComparator,
                                    sortKeySelector,
                                    rankType,
                                    rankRange,
                                    generateUpdateBefore,
                                    outputRankNumber,
                                    cacheSize);
                } else {
                    processFunction =
                            new AppendOnlyTopNFunction(
                                    ttlConfig,
                                    inputRowTypeInfo,
                                    sortKeyComparator,
                                    sortKeySelector,
                                    rankType,
                                    rankRange,
                                    generateUpdateBefore,
                                    outputRankNumber,
                                    cacheSize);
                }
            }
        } else if (rankStrategy instanceof RankProcessStrategy.UpdateFastStrategy) {
            if (RankUtil.isTop1(rankRange)) {
                if (isAsyncStateEnabled) {
                    processFunction =
                            new AsyncStateFastTop1Function(
                                    ttlConfig,
                                    inputRowTypeInfo,
                                    sortKeyComparator,
                                    sortKeySelector,
                                    rankType,
                                    rankRange,
                                    generateUpdateBefore,
                                    outputRankNumber,
                                    cacheSize);
                } else {
                    processFunction =
                            new FastTop1Function(
                                    ttlConfig,
                                    inputRowTypeInfo,
                                    sortKeyComparator,
                                    sortKeySelector,
                                    rankType,
                                    rankRange,
                                    generateUpdateBefore,
                                    outputRankNumber,
                                    cacheSize);
                }
            } else {
                RankProcessStrategy.UpdateFastStrategy updateFastStrategy =
                        (RankProcessStrategy.UpdateFastStrategy) rankStrategy;
                int[] primaryKeys = updateFastStrategy.getPrimaryKeys();
                RowDataKeySelector rowKeySelector =
                        KeySelectorUtil.getRowDataSelector(
                                planner.getFlinkContext().getClassLoader(),
                                primaryKeys,
                                inputRowTypeInfo);
                processFunction =
                        new UpdatableTopNFunction(
                                ttlConfig,
                                inputRowTypeInfo,
                                rowKeySelector,
                                sortKeyComparator,
                                sortKeySelector,
                                rankType,
                                rankRange,
                                generateUpdateBefore,
                                outputRankNumber,
                                cacheSize);
            }
        } else if (rankStrategy instanceof RankProcessStrategy.RetractStrategy) {
            EqualiserCodeGenerator equaliserCodeGen =
                    new EqualiserCodeGenerator(
                            inputType.getFields().stream()
                                    .map(RowType.RowField::getType)
                                    .toArray(LogicalType[]::new),
                            planner.getFlinkContext().getClassLoader());
            GeneratedRecordEqualiser generatedEqualiser =
                    equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser");
            ComparableRecordComparator comparator =
                    new ComparableRecordComparator(
                            sortKeyComparator,
                            sortKeyPositions,
                            sortSpec.getFieldTypes(inputType),
                            sortSpec.getAscendingOrders(),
                            sortSpec.getNullsIsLast());
            processFunction =
                    new RetractableTopNFunction(
                            ttlConfig,
                            inputRowTypeInfo,
                            comparator,
                            sortKeySelector,
                            rankType,
                            rankRange,
                            generatedEqualiser,
                            generateUpdateBefore,
                            outputRankNumber);
        } else {
            throw new TableException(
                    String.format("rank strategy:%s is not supported.", rankStrategy));
        }

        StreamOperator<RowData> operator;
        if (processFunction instanceof AbstractAsyncStateTopNFunction) {
            operator = new AsyncKeyedProcessOperator<>(processFunction);
            processFunction.setKeyContext(operator);
        } else {
            operator = new KeyedProcessOperator<>(processFunction);
            processFunction.setKeyContext(operator);
        }

        OneInputTransformation<RowData, RowData> transform =
                ExecNodeUtil.createOneInputTransformation(
                        inputTransform,
                        createTransformationMeta(RANK_TRANSFORMATION, config),
                        operator,
                        InternalTypeInfo.of((RowType) getOutputType()),
                        inputTransform.getParallelism(),
                        false);

        // set KeyType and Selector for state
        RowDataKeySelector selector =
                KeySelectorUtil.getRowDataSelector(
                        planner.getFlinkContext().getClassLoader(),
                        partitionSpec.getFieldIndices(),
                        inputRowTypeInfo);
        transform.setStateKeySelector(selector);
        transform.setStateKeyType(selector.getProducedType());
        return transform;
    }