in src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java [54:72]
public void setUp() throws Exception {
super.setUp();
env.setParallelism(CheckpointEnvironmentContext.JOB_PARALLELISM);
env.enableCheckpointing(Long.MAX_VALUE);
final StreamGraphWithSources streamGraphWithSources = getStreamGraph();
final JobClient jobClient = env.executeAsync(streamGraphWithSources.getStreamGraph());
jobID = jobClient.getJobID();
CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false);
BackpressureUtils.waitForBackpressure(
jobID,
streamGraphWithSources.getSources(),
miniCluster.getRestAddress().get(),
miniCluster.getConfiguration());
if (getSleepPostSetUp() > 0) {
Thread.sleep(getSleepPostSetUp());
}
}