in src/main/java/com/google/cloud/dfmetrics/commands/TemplateLauncherCommand.java [111:180]
public void run() {
LOG.info("Launching the dataflow job in project:{}", jobConfig.projectId());
try {
Job dataflowJob =
templateClient.launch(jobConfig.projectId(), jobConfig.location(), templateConfig);
LOG.info(
"Launched dataflow job with jobid:{} in project:{}",
dataflowJob.getId(),
dataflowJob.getProjectId());
DataflowJobManager jobManager =
DataflowJobManager.builder(this.dataflowClient, dataflowJob)
.setMaxTimeOut(Duration.ofMinutes(jobConfig.timeoutInMinutes()))
.build();
jobManager.printJobResponse();
// Wait until the job is active to get more information
Job job = jobManager.waitUntilActive();
// Refresh the job manager with latest job view
jobManager = jobManager.withJob(job);
// Get the job info
JobInfo jobInfo = getLaunchedJobInfo(jobManager, jobConfig, templateConfig);
LOG.info("Waiting for the job to finish or timeout");
DataflowJobManager.ExecutionStatus executionStatus =
jobInfo.isBatchJob()
? jobManager.waitUntilDoneOrCancelAfterTimeOut()
: jobManager.waitForDurationAndCancelJob();
LOG.info("Completed waiting for the job");
if (executionStatus == DataflowJobManager.ExecutionStatus.TIMEOUT) {
ImmutableSet<JobState> terminalJobStates =
ImmutableSet.<JobState>builder()
.addAll(JobState.DONE_STATES)
.addAll(JobState.FINISHING_STATES)
.build();
String expectedJobStates =
terminalJobStates.stream().map(JobState::toString).collect(Collectors.joining(","));
throw new RuntimeException(
String.format("Job failed to reach one of the terminal states: %s", expectedJobStates));
}
LOG.info("Fetching job metrics..");
MonitoringClient monitoringClient =
MonitoringClient.builder()
.setCredentials(
MetricsCollectorUtils.getCredentialsFromDataflowClient(this.dataflowClient))
.build();
MetricsManager metricsManager =
MetricsManager.builder(this.dataflowClient, jobManager.job(), monitoringClient)
.setMonitoringMetricsWaitDuration(Duration.ofMinutes(3))
.build();
Map<String, Double> metrics =
metricsManager.getMetrics(jobInfo.jobType(), jobConfig.resourcePricing());
LOG.info("Storing the results in the output store..");
this.outputStore.load(jobInfo, metrics);
LOG.info("Completed all operations");
} catch (IOException e) {
throw new RuntimeException(e);
}
}