flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [399:440]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public void snapshotState(StateSnapshotContext context) throws Exception {
        OperatorStateCheckpointOutputStream rawOperatorStateOutputStream =
                context.getRawOperatorStateOutput();
        List<Integer> operatorStateEpoch = new ArrayList<>();

        List<Integer> sortedEpochs = new ArrayList<>(wrappedOperators.keySet());
        Collections.sort(sortedEpochs);

        for (int epoch : sortedEpochs) {
            S wrappedOperator = wrappedOperators.get(epoch);
            if (StreamOperatorStateHandler.CheckpointedStreamOperator.class.isAssignableFrom(
                    wrappedOperator.getClass())) {
                ((StreamOperatorStateHandler.CheckpointedStreamOperator) wrappedOperator)
                        .snapshotState(new ProxyStateSnapshotContext(context));

                // Gets the count of the raw operator state.
                int numberOfPartitions = rawOperatorStateOutputStream.getNumberOfPartitions();
                while (operatorStateEpoch.size() < numberOfPartitions) {
                    operatorStateEpoch.add(epoch);
                }
            }
        }

        // Then snapshot our own states
        // Always clear the union list state before set value.
        parallelismState.clear();
        if (containingTask.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
            parallelismState.update(
                    Collections.singletonList(
                            containingTask
                                    .getEnvironment()
                                    .getTaskInfo()
                                    .getNumberOfParallelSubtasks()));
        }
        latestEpochWatermarkState.update(Collections.singletonList(latestEpochWatermark));

        // The list must be sorted
        rawStateEpochState.update(operatorStateEpoch);

        // The list must be sorted
        pendingEpochState.update(sortedEpochs);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [399:440]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public void snapshotState(StateSnapshotContext context) throws Exception {
        OperatorStateCheckpointOutputStream rawOperatorStateOutputStream =
                context.getRawOperatorStateOutput();
        List<Integer> operatorStateEpoch = new ArrayList<>();

        List<Integer> sortedEpochs = new ArrayList<>(wrappedOperators.keySet());
        Collections.sort(sortedEpochs);

        for (int epoch : sortedEpochs) {
            S wrappedOperator = wrappedOperators.get(epoch);
            if (StreamOperatorStateHandler.CheckpointedStreamOperator.class.isAssignableFrom(
                    wrappedOperator.getClass())) {
                ((StreamOperatorStateHandler.CheckpointedStreamOperator) wrappedOperator)
                        .snapshotState(new ProxyStateSnapshotContext(context));

                // Gets the count of the raw operator state.
                int numberOfPartitions = rawOperatorStateOutputStream.getNumberOfPartitions();
                while (operatorStateEpoch.size() < numberOfPartitions) {
                    operatorStateEpoch.add(epoch);
                }
            }
        }

        // Then snapshot our own states
        // Always clear the union list state before set value.
        parallelismState.clear();
        if (containingTask.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
            parallelismState.update(
                    Collections.singletonList(
                            containingTask
                                    .getEnvironment()
                                    .getTaskInfo()
                                    .getNumberOfParallelSubtasks()));
        }
        latestEpochWatermarkState.update(Collections.singletonList(latestEpochWatermark));

        // The list must be sorted
        rawStateEpochState.update(operatorStateEpoch);

        // The list must be sorted
        pendingEpochState.update(sortedEpochs);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



