in src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java [52:83]
public void setUp() throws Exception {
if (miniCluster != null) {
throw new RuntimeException("setUp was called multiple times!");
}
final Configuration clusterConfig = createConfiguration();
miniCluster =
new MiniCluster(
new MiniClusterConfiguration.Builder()
.setNumSlotsPerTaskManager(getNumberOfSlotsPerTaskManager())
.setNumTaskManagers(getNumberOfTaskManagers())
.setConfiguration(clusterConfig)
.build());
try {
miniCluster.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
// set up the execution environment
env =
new StreamExecutionEnvironment(
new MiniClusterPipelineExecutorServiceLoader(miniCluster),
clusterConfig,
null);
env.setParallelism(parallelism);
if (objectReuse) {
env.getConfig().enableObjectReuse();
}
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new MemoryStateBackend());
}