in src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java [92:110]
public void tearDownPerIteration() throws Exception {
applyToAllKeys(
keyedStateBackend,
STATE_DESC,
(k, state) -> {
keyedStateBackend.setCurrentKey(k);
state.clear();
});
// make the clearance effective, trigger compaction for RocksDB, and GC for heap.
if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
(RocksDBKeyedStateBackend<Long>) keyedStateBackend;
compactState(rocksDBKeyedStateBackend, STATE_DESC);
} else {
System.gc();
}
// wait a while for the clearance to take effect.
Thread.sleep(1000);
}