flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/Iterations.java [149:175]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public static DataStreamList iterateBoundedStreamsUntilTermination(
            DataStreamList initVariableStreams,
            ReplayableDataStreamList dataStreams,
            IterationConfig config,
            IterationBody body) {
        OperatorWrapper wrapper =
                config.getOperatorLifeCycle() == IterationConfig.OperatorLifeCycle.ALL_ROUND
                        ? new AllRoundOperatorWrapper<>()
                        : new PerRoundOperatorWrapper<>();

        List<DataStream<?>> allDatastreams = new ArrayList<>();
        allDatastreams.addAll(dataStreams.getReplayedDataStreams());
        allDatastreams.addAll(dataStreams.getNonReplayedStreams());

        Set<Integer> replayedIndices =
                IntStream.range(0, dataStreams.getReplayedDataStreams().size())
                        .boxed()
                        .collect(Collectors.toSet());

        return createIteration(
                initVariableStreams,
                new DataStreamList(allDatastreams),
                replayedIndices,
                body,
                wrapper,
                true);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/Iterations.java [149:175]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public static DataStreamList iterateBoundedStreamsUntilTermination(
            DataStreamList initVariableStreams,
            ReplayableDataStreamList dataStreams,
            IterationConfig config,
            IterationBody body) {
        OperatorWrapper wrapper =
                config.getOperatorLifeCycle() == IterationConfig.OperatorLifeCycle.ALL_ROUND
                        ? new AllRoundOperatorWrapper<>()
                        : new PerRoundOperatorWrapper<>();

        List<DataStream<?>> allDatastreams = new ArrayList<>();
        allDatastreams.addAll(dataStreams.getReplayedDataStreams());
        allDatastreams.addAll(dataStreams.getNonReplayedStreams());

        Set<Integer> replayedIndices =
                IntStream.range(0, dataStreams.getReplayedDataStreams().size())
                        .boxed()
                        .collect(Collectors.toSet());

        return createIteration(
                initVariableStreams,
                new DataStreamList(allDatastreams),
                replayedIndices,
                body,
                wrapper,
                true);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



