flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [207:234]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public void onEpochWatermarkIncrement(int epochWatermark) throws IOException {
        checkState(epochWatermark >= 0, "The epoch watermark should be non-negative.");
        if (epochWatermark > latestEpochWatermark) {
            latestEpochWatermark = epochWatermark;

            // Destroys all the operators with round < epoch watermark. Notes that
            // the onEpochWatermarkIncrement must be from 0 and increment by 1 each time, except
            // for the last round.
            try {
                if (epochWatermark < Integer.MAX_VALUE) {
                    S wrappedOperator = wrappedOperators.remove(epochWatermark);
                    if (wrappedOperator != null) {
                        closeStreamOperator(wrappedOperator, epochWatermark, epochWatermark);
                    }
                } else {
                    List<Integer> sortedEpochs = new ArrayList<>(wrappedOperators.keySet());
                    Collections.sort(sortedEpochs);
                    for (Integer epoch : sortedEpochs) {
                        closeStreamOperator(wrappedOperators.remove(epoch), epoch, epochWatermark);
                    }
                }
            } catch (Exception exception) {
                ExceptionUtils.rethrow(exception);
            }
        }

        super.onEpochWatermarkIncrement(epochWatermark);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [207:234]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public void onEpochWatermarkIncrement(int epochWatermark) throws IOException {
        checkState(epochWatermark >= 0, "The epoch watermark should be non-negative.");
        if (epochWatermark > latestEpochWatermark) {
            latestEpochWatermark = epochWatermark;

            // Destroys all the operators with round < epoch watermark. Notes that
            // the onEpochWatermarkIncrement must be from 0 and increment by 1 each time, except
            // for the last round.
            try {
                if (epochWatermark < Integer.MAX_VALUE) {
                    S wrappedOperator = wrappedOperators.remove(epochWatermark);
                    if (wrappedOperator != null) {
                        closeStreamOperator(wrappedOperator, epochWatermark, epochWatermark);
                    }
                } else {
                    List<Integer> sortedEpochs = new ArrayList<>(wrappedOperators.keySet());
                    Collections.sort(sortedEpochs);
                    for (Integer epoch : sortedEpochs) {
                        closeStreamOperator(wrappedOperators.remove(epoch), epoch, epochWatermark);
                    }
                }
            } catch (Exception exception) {
                ExceptionUtils.rethrow(exception);
            }
        }

        super.onEpochWatermarkIncrement(epochWatermark);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



