static void waitForBackpressure()

in src/main/java/org/apache/flink/benchmark/BackpressureUtils.java [41:70]


    static void waitForBackpressure(
            JobID jobID,
            List<JobVertexID> sourceId,
            URI restAddress,
            Configuration clientConfiguration)
            throws Exception {
        RestClusterClient<StandaloneClusterId> restClient =
                createClient(restAddress.getPort(), clientConfiguration);
        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
        boolean allBackpressured;
        // There seems to be a race condition in some setups, between setting up REST server
        // and client being able to connect. This is handled by the retrying mechanism in
        // the RestClusterClient, but time out takes a lot of time to trigger, so we are
        // doing a little bit of sleep here in an attempt to avoid waiting for that timeout.
        Thread.sleep(100);
        do {
            allBackpressured =
                    sourceId.stream()
                            .map(id -> queryBackpressure(jobID, id, restClient, restAddress))
                            .allMatch(
                                    level ->
                                            level
                                                    == JobVertexBackPressureInfo
                                                            .VertexBackPressureLevel.HIGH);
        } while (!allBackpressured && deadline.hasTimeLeft());
        if (!allBackpressured) {
            throw new FlinkRuntimeException(
                    "Could not trigger backpressure for the job in given time.");
        }
    }