flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [151:182]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private S getWrappedOperator(
            int round, Iterator<StatePartitionStreamProvider> rawOperatorStates, int count) {
        S wrappedOperator = wrappedOperators.get(round);
        if (wrappedOperator != null) {
            return wrappedOperator;
        }

        // We need to clone the operator factory to also support SimpleOperatorFactory.
        try {
            StreamOperatorFactory<T> clonedOperatorFactory =
                    InstantiationUtil.clone(
                            operatorFactory, containingTask.getUserCodeClassLoader());
            wrappedOperator =
                    (S)
                            StreamOperatorFactoryUtil.<T, S>createOperator(
                                            clonedOperatorFactory,
                                            (StreamTask) parameters.getContainingTask(),
                                            OperatorUtils.createWrappedOperatorConfig(
                                                    parameters.getStreamConfig(),
                                                    containingTask.getUserCodeClassLoader()),
                                            proxyOutput,
                                            parameters.getOperatorEventDispatcher())
                                    .f0;
            initializeStreamOperator(wrappedOperator, round, rawOperatorStates, count);
            wrappedOperators.put(round, wrappedOperator);
            return wrappedOperator;
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }

        return wrappedOperator;
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [151:182]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private S getWrappedOperator(
            int round, Iterator<StatePartitionStreamProvider> rawOperatorStates, int count) {
        S wrappedOperator = wrappedOperators.get(round);
        if (wrappedOperator != null) {
            return wrappedOperator;
        }

        // We need to clone the operator factory to also support SimpleOperatorFactory.
        try {
            StreamOperatorFactory<T> clonedOperatorFactory =
                    InstantiationUtil.clone(
                            operatorFactory, containingTask.getUserCodeClassLoader());
            wrappedOperator =
                    (S)
                            StreamOperatorFactoryUtil.<T, S>createOperator(
                                            clonedOperatorFactory,
                                            (StreamTask) parameters.getContainingTask(),
                                            OperatorUtils.createWrappedOperatorConfig(
                                                    parameters.getStreamConfig(),
                                                    containingTask.getUserCodeClassLoader()),
                                            proxyOutput,
                                            parameters.getOperatorEventDispatcher())
                                    .f0;
            initializeStreamOperator(wrappedOperator, round, rawOperatorStates, count);
            wrappedOperators.put(round, wrappedOperator);
            return wrappedOperator;
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }

        return wrappedOperator;
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



