flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java [55:137]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class OperatorUtils {

    /** Returns the unique id for the specified operator. */
    public static String getUniqueSenderId(OperatorID operatorId, int subtaskIndex) {
        return operatorId.toHexString() + "-" + subtaskIndex;
    }

    /** Creates {@link FeedbackKey} from the {@code iterationId} and {@code feedbackIndex}. */
    public static <V> FeedbackKey<V> createFeedbackKey(IterationID iterationId, int feedbackIndex) {
        return new FeedbackKey<>(iterationId.toHexString(), feedbackIndex);
    }

    /** Registers the specified {@code feedbackConsumer} to the {@code feedbackChannel}. */
    public static <V> void registerFeedbackConsumer(
            FeedbackChannel<V> feedbackChannel,
            FeedbackConsumer<V> feedbackConsumer,
            Executor executor) {
        ReflectionUtils.callMethod(
                feedbackChannel,
                FeedbackChannel.class,
                "registerConsumer",
                Arrays.asList(FeedbackConsumer.class, Executor.class),
                Arrays.asList(feedbackConsumer, executor));
    }

    public static <T> void processOperatorOrUdfIfSatisfy(
            StreamOperator<?> operator,
            Class<T> targetInterface,
            ThrowingConsumer<T, Exception> action) {
        try {
            if (targetInterface.isAssignableFrom(operator.getClass())) {
                action.accept((T) operator);
            } else if (operator instanceof AbstractUdfStreamOperator<?, ?>) {
                Object udf = ((AbstractUdfStreamOperator<?, ?>) operator).getUserFunction();
                if (targetInterface.isAssignableFrom(udf.getClass())) {
                    action.accept((T) udf);
                }
            }
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }

    public static StreamConfig createWrappedOperatorConfig(StreamConfig config, ClassLoader cl) {
        StreamConfig wrappedConfig = new StreamConfig(config.getConfiguration().clone());
        for (int i = 0; i < wrappedConfig.getNumberOfNetworkInputs(); ++i) {
            KeySelector keySelector = config.getStatePartitioner(i, cl);
            if (keySelector != null) {
                checkState(
                        keySelector instanceof ProxyKeySelector,
                        "The state partitioner for the wrapper operator should always be ProxyKeySelector, but it is "
                                + keySelector);
                wrappedConfig.setStatePartitioner(
                        i, ((ProxyKeySelector) keySelector).getWrappedKeySelector());
            }
        }

        StreamConfig.InputConfig[] inputs = config.getInputs(cl);
        for (int i = 0; i < inputs.length; ++i) {
            if (inputs[i] instanceof NetworkInputConfig) {
                TypeSerializer<?> typeSerializerIn =
                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
                checkState(
                        typeSerializerIn instanceof IterationRecordSerializer,
                        "The serializer of input[%s] should be IterationRecordSerializer but it is %s.",
                        i,
                        typeSerializerIn);
                inputs[i] =
                        new NetworkInputConfig(
                                ((IterationRecordSerializer<?>) typeSerializerIn)
                                        .getInnerSerializer(),
                                i);
            }
        }
        wrappedConfig.setInputs(inputs);

        TypeSerializer<?> typeSerializerOut = config.getTypeSerializerOut(cl);
        checkState(
                typeSerializerOut instanceof IterationRecordSerializer,
                "The serializer of output should be IterationRecordSerializer but it is %s.",
                typeSerializerOut);
        wrappedConfig.setTypeSerializerOut(
                ((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java [54:136]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class OperatorUtils {

    /** Returns the unique id for the specified operator. */
    public static String getUniqueSenderId(OperatorID operatorId, int subtaskIndex) {
        return operatorId.toHexString() + "-" + subtaskIndex;
    }

    /** Creates {@link FeedbackKey} from the {@code iterationId} and {@code feedbackIndex}. */
    public static <V> FeedbackKey<V> createFeedbackKey(IterationID iterationId, int feedbackIndex) {
        return new FeedbackKey<>(iterationId.toHexString(), feedbackIndex);
    }

    /** Registers the specified {@code feedbackConsumer} to the {@code feedbackChannel}. */
    public static <V> void registerFeedbackConsumer(
            FeedbackChannel<V> feedbackChannel,
            FeedbackConsumer<V> feedbackConsumer,
            Executor executor) {
        ReflectionUtils.callMethod(
                feedbackChannel,
                FeedbackChannel.class,
                "registerConsumer",
                Arrays.asList(FeedbackConsumer.class, Executor.class),
                Arrays.asList(feedbackConsumer, executor));
    }

    public static <T> void processOperatorOrUdfIfSatisfy(
            StreamOperator<?> operator,
            Class<T> targetInterface,
            ThrowingConsumer<T, Exception> action) {
        try {
            if (targetInterface.isAssignableFrom(operator.getClass())) {
                action.accept((T) operator);
            } else if (operator instanceof AbstractUdfStreamOperator<?, ?>) {
                Object udf = ((AbstractUdfStreamOperator<?, ?>) operator).getUserFunction();
                if (targetInterface.isAssignableFrom(udf.getClass())) {
                    action.accept((T) udf);
                }
            }
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }

    public static StreamConfig createWrappedOperatorConfig(StreamConfig config, ClassLoader cl) {
        StreamConfig wrappedConfig = new StreamConfig(config.getConfiguration().clone());
        for (int i = 0; i < wrappedConfig.getNumberOfNetworkInputs(); ++i) {
            KeySelector keySelector = config.getStatePartitioner(i, cl);
            if (keySelector != null) {
                checkState(
                        keySelector instanceof ProxyKeySelector,
                        "The state partitioner for the wrapper operator should always be ProxyKeySelector, but it is "
                                + keySelector);
                wrappedConfig.setStatePartitioner(
                        i, ((ProxyKeySelector) keySelector).getWrappedKeySelector());
            }
        }

        StreamConfig.InputConfig[] inputs = config.getInputs(cl);
        for (int i = 0; i < inputs.length; ++i) {
            if (inputs[i] instanceof NetworkInputConfig) {
                TypeSerializer<?> typeSerializerIn =
                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
                checkState(
                        typeSerializerIn instanceof IterationRecordSerializer,
                        "The serializer of input[%s] should be IterationRecordSerializer but it is %s.",
                        i,
                        typeSerializerIn);
                inputs[i] =
                        new NetworkInputConfig(
                                ((IterationRecordSerializer<?>) typeSerializerIn)
                                        .getInnerSerializer(),
                                i);
            }
        }
        wrappedConfig.setInputs(inputs);

        TypeSerializer<?> typeSerializerOut = config.getTypeSerializerOut(cl);
        checkState(
                typeSerializerOut instanceof IterationRecordSerializer,
                "The serializer of output should be IterationRecordSerializer but it is %s.",
                typeSerializerOut);
        wrappedConfig.setTypeSerializerOut(
                ((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



