in src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java [51:82]
public void setUp() throws Exception {
if (miniCluster != null) {
throw new RuntimeException("setUp was called multiple times!");
}
final Configuration clusterConfig = createConfiguration();
clusterConfig.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
clusterConfig.set(StateBackendOptions.STATE_BACKEND, "hashmap");
miniCluster =
new MiniCluster(
new MiniClusterConfiguration.Builder()
.setNumSlotsPerTaskManager(getNumberOfSlotsPerTaskManager())
.setNumTaskManagers(getNumberOfTaskManagers())
.setConfiguration(clusterConfig)
.build());
try {
miniCluster.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
// set up the execution environment
env =
new StreamExecutionEnvironment(
new MiniClusterPipelineExecutorServiceLoader(miniCluster),
clusterConfig,
null);
env.setParallelism(parallelism);
if (objectReuse) {
env.getConfig().enableObjectReuse();
}
}