in src/main/java/com/google/cloud/dfmetrics/pipelinemanager/DataflowJobManager.java [320:357]
private static ExecutionStatus finishOrTimeout(
Duration maxTimeoutDuration,
Supplier<Boolean>[] conditionCheck,
Supplier<Boolean>... stopChecking) {
Instant start = Instant.now();
while (timeIsLeft(start, maxTimeoutDuration)) {
LOG.debug("Checking if condition is met.");
try {
if (allMatch(conditionCheck)) {
LOG.info("Condition met!");
return ExecutionStatus.CONDITION_MET;
}
} catch (Exception e) {
LOG.warn("Error happened when checking for condition", e);
}
LOG.info("Condition was not met yet. Checking if job is finished.");
if (allMatch(stopChecking)) {
LOG.info("Detected that we should stop checking.");
return ExecutionStatus.LAUNCH_FINISHED;
}
LOG.info(
"Job not finished and conditions not met. Will check again in {} seconds (total wait: {}s"
+ " of max {}s)",
DEFAULT_CHECK_AFTER_DURATION.getSeconds(),
Duration.between(start, Instant.now()).getSeconds(),
maxTimeoutDuration.getSeconds());
try {
Thread.sleep(DEFAULT_CHECK_AFTER_DURATION.toMillis());
} catch (InterruptedException e) {
LOG.warn("Wait interrupted. Checking now.");
}
}
LOG.warn("Neither the condition or job completion were fulfilled on time.");
return ExecutionStatus.TIMEOUT;
}