in src/main/java/org/apache/flink/benchmark/BackpressureUtils.java [41:70]
static void waitForBackpressure(
JobID jobID,
List<JobVertexID> sourceId,
URI restAddress,
Configuration clientConfiguration)
throws Exception {
RestClusterClient<StandaloneClusterId> restClient =
createClient(restAddress.getPort(), clientConfiguration);
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
boolean allBackpressured;
// There seems to be a race condition in some setups, between setting up REST server
// and client being able to connect. This is handled by the retrying mechanism in
// the RestClusterClient, but time out takes a lot of time to trigger, so we are
// doing a little bit of sleep here in an attempt to avoid waiting for that timeout.
Thread.sleep(100);
do {
allBackpressured =
sourceId.stream()
.map(id -> queryBackpressure(jobID, id, restClient, restAddress))
.allMatch(
level ->
level
== JobVertexBackPressureInfo
.VertexBackPressureLevel.HIGH);
} while (!allBackpressured && deadline.hasTimeLeft());
if (!allBackpressured) {
throw new FlinkRuntimeException(
"Could not trigger backpressure for the job in given time.");
}
}