public static StreamConfig createWrappedOperatorConfig()

in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java [97:153]


    public static StreamConfig createWrappedOperatorConfig(StreamConfig config, ClassLoader cl) {
        StreamConfig wrappedConfig = new StreamConfig(config.getConfiguration().clone());
        for (int i = 0; i < wrappedConfig.getNumberOfNetworkInputs(); ++i) {
            KeySelector keySelector = config.getStatePartitioner(i, cl);
            if (keySelector != null) {
                checkState(
                        keySelector instanceof ProxyKeySelector,
                        "The state partitioner for the wrapper operator should always be ProxyKeySelector, but it is "
                                + keySelector);
                wrappedConfig.setStatePartitioner(
                        i, ((ProxyKeySelector) keySelector).getWrappedKeySelector());
            }
        }

        StreamConfig.InputConfig[] inputs = config.getInputs(cl);
        for (int i = 0; i < inputs.length; ++i) {
            if (inputs[i] instanceof NetworkInputConfig) {
                TypeSerializer<?> typeSerializerIn =
                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
                checkState(
                        typeSerializerIn instanceof IterationRecordSerializer,
                        "The serializer of input[%s] should be IterationRecordSerializer but it is %s.",
                        i,
                        typeSerializerIn);
                inputs[i] =
                        new NetworkInputConfig(
                                ((IterationRecordSerializer<?>) typeSerializerIn)
                                        .getInnerSerializer(),
                                i);
            }
        }
        wrappedConfig.setInputs(inputs);

        TypeSerializer<?> typeSerializerOut = config.getTypeSerializerOut(cl);
        checkState(
                typeSerializerOut instanceof IterationRecordSerializer,
                "The serializer of output should be IterationRecordSerializer but it is %s.",
                typeSerializerOut);
        wrappedConfig.setTypeSerializerOut(
                ((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());

        config.getChainedOutputs(cl)
                .forEach(
                        chainedOutput -> {
                            OutputTag<?> outputTag = chainedOutput.getOutputTag();
                            setTypeSerializerSideOut(outputTag, config, wrappedConfig, cl);
                        });
        config.getOperatorNonChainedOutputs(cl)
                .forEach(
                        nonChainedOutput -> {
                            OutputTag<?> outputTag = nonChainedOutput.getOutputTag();
                            setTypeSerializerSideOut(outputTag, config, wrappedConfig, cl);
                        });

        wrappedConfig.serializeAllConfigs();
        return wrappedConfig;
    }