flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [581:603]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void cleanupOperatorStates(int round) {
        String roundPrefix = getRoundStatePrefix(round);
        OperatorStateBackend operatorStateBackend = stateHandler.getOperatorStateBackend();

        if (operatorStateBackend instanceof DefaultOperatorStateBackend) {
            for (String fieldNames :
                    new String[] {
                        "registeredOperatorStates",
                        "registeredBroadcastStates",
                        "accessedStatesByName",
                        "accessedBroadcastStatesByName"
                    }) {
                Map<String, ?> field =
                        ReflectionUtils.getFieldValue(
                                operatorStateBackend,
                                DefaultOperatorStateBackend.class,
                                fieldNames);
                field.entrySet().removeIf(entry -> entry.getKey().startsWith(roundPrefix));
            }
        } else {
            LOG.warn("Unable to cleanup the operator state {}", operatorStateBackend);
        }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [581:603]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void cleanupOperatorStates(int round) {
        String roundPrefix = getRoundStatePrefix(round);
        OperatorStateBackend operatorStateBackend = stateHandler.getOperatorStateBackend();

        if (operatorStateBackend instanceof DefaultOperatorStateBackend) {
            for (String fieldNames :
                    new String[] {
                        "registeredOperatorStates",
                        "registeredBroadcastStates",
                        "accessedStatesByName",
                        "accessedBroadcastStatesByName"
                    }) {
                Map<String, ?> field =
                        ReflectionUtils.getFieldValue(
                                operatorStateBackend,
                                DefaultOperatorStateBackend.class,
                                fieldNames);
                field.entrySet().removeIf(entry -> entry.getKey().startsWith(roundPrefix));
            }
        } else {
            LOG.warn("Unable to cleanup the operator state {}", operatorStateBackend);
        }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



