public static StreamConfig createWrappedOperatorConfig()

in flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java [98:165]


    public static StreamConfig createWrappedOperatorConfig(StreamConfig config, ClassLoader cl) {
        StreamConfig wrappedConfig = new StreamConfig(config.getConfiguration().clone());
        for (int i = 0; i < wrappedConfig.getNumberOfNetworkInputs(); ++i) {
            KeySelector keySelector = config.getStatePartitioner(i, cl);
            if (keySelector != null) {
                checkState(
                        keySelector instanceof ProxyKeySelector,
                        "The state partitioner for the wrapper operator should always be ProxyKeySelector, but it is "
                                + keySelector);
                wrappedConfig.setStatePartitioner(
                        i, ((ProxyKeySelector) keySelector).getWrappedKeySelector());
            }
        }

        StreamConfig.InputConfig[] inputs = config.getInputs(cl);
        for (int i = 0; i < inputs.length; ++i) {
            if (inputs[i] instanceof NetworkInputConfig) {
                TypeSerializer<?> typeSerializerIn =
                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
                checkState(
                        typeSerializerIn instanceof IterationRecordSerializer,
                        "The serializer of input[%s] should be IterationRecordSerializer but it is %s.",
                        i,
                        typeSerializerIn);
                inputs[i] =
                        new NetworkInputConfig(
                                ((IterationRecordSerializer<?>) typeSerializerIn)
                                        .getInnerSerializer(),
                                i);
            }
        }
        wrappedConfig.setInputs(inputs);

        TypeSerializer<?> typeSerializerOut = config.getTypeSerializerOut(cl);
        checkState(
                typeSerializerOut instanceof IterationRecordSerializer,
                "The serializer of output should be IterationRecordSerializer but it is %s.",
                typeSerializerOut);
        wrappedConfig.setTypeSerializerOut(
                ((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());

        Stream.concat(
                        config.getChainedOutputs(cl).stream(),
                        config.getNonChainedOutputs(cl).stream())
                .forEach(
                        edge -> {
                            OutputTag<?> outputTag = edge.getOutputTag();
                            if (outputTag != null) {
                                TypeSerializer<?> typeSerializerSideOut =
                                        config.getTypeSerializerSideOut(outputTag, cl);
                                checkState(
                                        typeSerializerSideOut instanceof IterationRecordSerializer,
                                        "The serializer of side output with tag[%s] should be IterationRecordSerializer but it is %s.",
                                        outputTag,
                                        typeSerializerSideOut);
                                wrappedConfig.setTypeSerializerSideOut(
                                        new OutputTag<>(
                                                outputTag.getId(),
                                                ((IterationRecordTypeInfo<?>)
                                                                outputTag.getTypeInfo())
                                                        .getInnerTypeInfo()),
                                        ((IterationRecordSerializer) typeSerializerSideOut)
                                                .getInnerSerializer());
                            }
                        });

        return wrappedConfig;
    }