in src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java [59:93]
public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws IOException {
try {
super.setUp();
} catch (Exception e) {
e.printStackTrace();
}
final AbstractStateBackend backend;
String checkpointDataUri = "file://" + checkpointDir.getAbsolutePath();
switch (stateBackend) {
case MEMORY:
backend = new MemoryStateBackend();
break;
case FS:
backend = new FsStateBackend(checkpointDataUri, false);
break;
case FS_ASYNC:
backend = new FsStateBackend(checkpointDataUri, true);
break;
case ROCKS:
backend = new RocksDBStateBackend(checkpointDataUri, false);
break;
case ROCKS_INC:
backend = new RocksDBStateBackend(checkpointDataUri, true);
break;
default:
throw new UnsupportedOperationException(
"Unknown state backend: " + stateBackend);
}
env.setStateBackend(backend);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
source = env.addSource(new IntegerLongSource(numberOfElements, recordsPerInvocation));
}