private void cleanupKeyedStates()

in flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [605:652]


    private void cleanupKeyedStates(int round) {
        String roundPrefix = getRoundStatePrefix(round);
        KeyedStateBackend<?> keyedStateBackend = stateHandler.getKeyedStateBackend();
        if (keyedStateBackend.getClass().getName().equals(HEAP_KEYED_STATE_NAME)) {
            ReflectionUtils.<Map<String, ?>>getFieldValue(
                            keyedStateBackend, HeapKeyedStateBackend.class, "registeredKVStates")
                    .entrySet()
                    .removeIf(entry -> entry.getKey().startsWith(roundPrefix));
            ReflectionUtils.<Map<String, ?>>getFieldValue(
                            keyedStateBackend,
                            AbstractKeyedStateBackend.class,
                            "keyValueStatesByName")
                    .entrySet()
                    .removeIf(entry -> entry.getKey().startsWith(roundPrefix));
        } else if (keyedStateBackend.getClass().getName().equals(ROCKSDB_KEYED_STATE_NAME)) {
            RocksDB db =
                    ReflectionUtils.getFieldValue(
                            keyedStateBackend, RocksDBKeyedStateBackend.class, "db");
            HashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
                    ReflectionUtils.getFieldValue(
                            keyedStateBackend,
                            RocksDBKeyedStateBackend.class,
                            "kvStateInformation");
            kvStateInformation.entrySet().stream()
                    .filter(entry -> entry.getKey().startsWith(roundPrefix))
                    .forEach(
                            entry -> {
                                try {
                                    db.dropColumnFamily(entry.getValue().columnFamilyHandle);
                                } catch (Exception e) {
                                    LOG.error(
                                            "Failed to drop state {} for round {}",
                                            entry.getKey(),
                                            round);
                                }
                            });
            kvStateInformation.entrySet().removeIf(entry -> entry.getKey().startsWith(roundPrefix));

            Map<String, ?> field =
                    ReflectionUtils.getFieldValue(
                            keyedStateBackend,
                            AbstractKeyedStateBackend.class,
                            "keyValueStatesByName");
            field.entrySet().removeIf(entry -> entry.getKey().startsWith(roundPrefix));
        } else {
            LOG.warn("Unable to cleanup the keyed state {}", keyedStateBackend);
        }
    }