public void setUp()

in src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java [61:98]


        public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws IOException {
            try {
                super.setUp();
            } catch (Exception e) {
                e.printStackTrace();
            }


            Configuration configuration = Configuration.fromMap(env.getConfiguration().toMap());
            String checkpointDataUri = "file://" + checkpointDir.getAbsolutePath();
            switch (stateBackend) {
                case MEMORY:
                    configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
                    configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
                    break;
                case FS:
                    configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
                    configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
                    break;
                case ROCKS:
                    configuration.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
                    configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
                    configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, false);
                    break;
                case ROCKS_INC:
                    configuration.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
                    configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
                    configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
                    break;
                default:
                    throw new UnsupportedOperationException(
                            "Unknown state backend: " + stateBackend);
            }

            // default character
            //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            source = env.addSource(new IntegerLongSource(numberOfElements, recordsPerInvocation));
        }