in src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java [64:75]
protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception {
Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
String stateDataDirPath = benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
File dataDir = null;
if (stateDataDirPath != null) {
dataDir = new File(stateDataDirPath);
if (!dataDir.exists()) {
Files.createDirectories(Paths.get(stateDataDirPath));
}
}
return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, dataDir);
}