flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [184:204]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    protected abstract void endInputAndEmitMaxWatermark(S operator, int epoch, int epochWatermark)
            throws Exception;

    protected void closeStreamOperator(S operator, int epoch, int epochWatermark) throws Exception {
        setIterationContextRound(epoch);
        OperatorUtils.processOperatorOrUdfIfSatisfy(
                operator,
                IterationListener.class,
                listener -> notifyEpochWatermarkIncrement(listener, epochWatermark));
        endInputAndEmitMaxWatermark(operator, epoch, epochWatermark);
        operator.finish();
        operator.close();
        setIterationContextRound(null);

        // Cleanup the states used by this operator.
        cleanupOperatorStates(epoch);

        if (stateHandler.getKeyedStateBackend() != null) {
            cleanupKeyedStates(epoch);
        }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [184:204]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    protected abstract void endInputAndEmitMaxWatermark(S operator, int epoch, int epochWatermark)
            throws Exception;

    protected void closeStreamOperator(S operator, int epoch, int epochWatermark) throws Exception {
        setIterationContextRound(epoch);
        OperatorUtils.processOperatorOrUdfIfSatisfy(
                operator,
                IterationListener.class,
                listener -> notifyEpochWatermarkIncrement(listener, epochWatermark));
        endInputAndEmitMaxWatermark(operator, epoch, epochWatermark);
        operator.finish();
        operator.close();
        setIterationContextRound(null);

        // Cleanup the states used by this operator.
        cleanupOperatorStates(epoch);

        if (stateHandler.getKeyedStateBackend() != null) {
            cleanupKeyedStates(epoch);
        }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



