in src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java [61:98]
public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws IOException {
try {
super.setUp();
} catch (Exception e) {
e.printStackTrace();
}
Configuration configuration = Configuration.fromMap(env.getConfiguration().toMap());
String checkpointDataUri = "file://" + checkpointDir.getAbsolutePath();
switch (stateBackend) {
case MEMORY:
configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
break;
case FS:
configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
break;
case ROCKS:
configuration.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, false);
break;
case ROCKS_INC:
configuration.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
break;
default:
throw new UnsupportedOperationException(
"Unknown state backend: " + stateBackend);
}
// default character
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
source = env.addSource(new IntegerLongSource(numberOfElements, recordsPerInvocation));
}