in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [605:661]
private void cleanupKeyedStates(int round) {
String roundPrefix = getRoundStatePrefix(round);
KeyedStateBackend<?> keyedStateBackend = stateHandler.getKeyedStateBackend();
if (keyedStateBackend.getClass().getName().equals(HEAP_KEYED_STATE_NAME)) {
ReflectionUtils.<Map<String, ?>>getFieldValue(
keyedStateBackend, HeapKeyedStateBackend.class, "registeredKVStates")
.entrySet()
.removeIf(entry -> entry.getKey().startsWith(roundPrefix));
ReflectionUtils.<Map<String, ?>>getFieldValue(
keyedStateBackend, HeapKeyedStateBackend.class, "createdKVStates")
.entrySet()
.removeIf(entry -> entry.getKey().startsWith(roundPrefix));
ReflectionUtils.<Map<String, ?>>getFieldValue(
keyedStateBackend,
AbstractKeyedStateBackend.class,
"keyValueStatesByName")
.entrySet()
.removeIf(entry -> entry.getKey().startsWith(roundPrefix));
} else if (keyedStateBackend.getClass().getName().equals(ROCKSDB_KEYED_STATE_NAME)) {
RocksDB db =
ReflectionUtils.getFieldValue(
keyedStateBackend, RocksDBKeyedStateBackend.class, "db");
HashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
ReflectionUtils.getFieldValue(
keyedStateBackend,
RocksDBKeyedStateBackend.class,
"kvStateInformation");
kvStateInformation.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(roundPrefix))
.forEach(
entry -> {
try {
db.dropColumnFamily(entry.getValue().columnFamilyHandle);
} catch (Exception e) {
LOG.error(
"Failed to drop state {} for round {}",
entry.getKey(),
round);
}
});
kvStateInformation.entrySet().removeIf(entry -> entry.getKey().startsWith(roundPrefix));
ReflectionUtils.<Map<String, ?>>getFieldValue(
keyedStateBackend, RocksDBKeyedStateBackend.class, "createdKVStates")
.entrySet()
.removeIf(entry -> entry.getKey().startsWith(roundPrefix));
ReflectionUtils.<Map<String, ?>>getFieldValue(
keyedStateBackend,
AbstractKeyedStateBackend.class,
"keyValueStatesByName")
.entrySet()
.removeIf(entry -> entry.getKey().startsWith(roundPrefix));
} else {
LOG.warn("Unable to cleanup the keyed state {}", keyedStateBackend);
}
}