flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [611:641]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    .entrySet()
                    .removeIf(entry -> entry.getKey().startsWith(roundPrefix));
            ReflectionUtils.<Map<String, ?>>getFieldValue(
                            keyedStateBackend,
                            AbstractKeyedStateBackend.class,
                            "keyValueStatesByName")
                    .entrySet()
                    .removeIf(entry -> entry.getKey().startsWith(roundPrefix));
        } else if (keyedStateBackend.getClass().getName().equals(ROCKSDB_KEYED_STATE_NAME)) {
            RocksDB db =
                    ReflectionUtils.getFieldValue(
                            keyedStateBackend, RocksDBKeyedStateBackend.class, "db");
            HashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
                    ReflectionUtils.getFieldValue(
                            keyedStateBackend,
                            RocksDBKeyedStateBackend.class,
                            "kvStateInformation");
            kvStateInformation.entrySet().stream()
                    .filter(entry -> entry.getKey().startsWith(roundPrefix))
                    .forEach(
                            entry -> {
                                try {
                                    db.dropColumnFamily(entry.getValue().columnFamilyHandle);
                                } catch (Exception e) {
                                    LOG.error(
                                            "Failed to drop state {} for round {}",
                                            entry.getKey(),
                                            round);
                                }
                            });
            kvStateInformation.entrySet().removeIf(entry -> entry.getKey().startsWith(roundPrefix));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [615:645]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    .entrySet()
                    .removeIf(entry -> entry.getKey().startsWith(roundPrefix));
            ReflectionUtils.<Map<String, ?>>getFieldValue(
                            keyedStateBackend,
                            AbstractKeyedStateBackend.class,
                            "keyValueStatesByName")
                    .entrySet()
                    .removeIf(entry -> entry.getKey().startsWith(roundPrefix));
        } else if (keyedStateBackend.getClass().getName().equals(ROCKSDB_KEYED_STATE_NAME)) {
            RocksDB db =
                    ReflectionUtils.getFieldValue(
                            keyedStateBackend, RocksDBKeyedStateBackend.class, "db");
            HashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
                    ReflectionUtils.getFieldValue(
                            keyedStateBackend,
                            RocksDBKeyedStateBackend.class,
                            "kvStateInformation");
            kvStateInformation.entrySet().stream()
                    .filter(entry -> entry.getKey().startsWith(roundPrefix))
                    .forEach(
                            entry -> {
                                try {
                                    db.dropColumnFamily(entry.getValue().columnFamilyHandle);
                                } catch (Exception e) {
                                    LOG.error(
                                            "Failed to drop state {} for round {}",
                                            entry.getKey(),
                                            round);
                                }
                            });
            kvStateInformation.entrySet().removeIf(entry -> entry.getKey().startsWith(roundPrefix));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



