in src/main/java/com/google/cloud/dfmetrics/pipelinemanager/DataflowJobManager.java [214:242]
public Job waitUntilActive() throws IOException {
Job job = getLatestJobView();
JobState state = handleJobState(job);
Instant start = Instant.now();
LOG.info("Current state:{}", state.toString());
while (JobState.PENDING_STATES.contains(state)) {
LOG.info(
"Job still pending. Will check again in 15 seconds. (total wait: {}s" + " of max {}s)",
Duration.between(start, Instant.now()).getSeconds(),
this.maxTimeOut().getSeconds());
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
LOG.warn("Wait interrupted. Checking now.");
}
LOG.info("Calling state check:");
job = getLatestJobView();
state = handleJobState(job);
LOG.info("Current state:{}", state.toString());
}
if (state == JobState.FAILED) {
throw new RuntimeException(
String.format(
"The job failed before launch! For more "
+ "information please check if the job log for Job ID: %s, under project %s.",
job().getId(), job().getProjectId()));
}
return job;
}