public void run()

in src/main/java/com/google/cloud/dfmetrics/commands/TemplateLauncherCommand.java [111:180]


  public void run() {
    LOG.info("Launching the dataflow job in project:{}", jobConfig.projectId());
    try {
      Job dataflowJob =
          templateClient.launch(jobConfig.projectId(), jobConfig.location(), templateConfig);
      LOG.info(
          "Launched dataflow job with jobid:{} in project:{}",
          dataflowJob.getId(),
          dataflowJob.getProjectId());

      DataflowJobManager jobManager =
          DataflowJobManager.builder(this.dataflowClient, dataflowJob)
              .setMaxTimeOut(Duration.ofMinutes(jobConfig.timeoutInMinutes()))
              .build();

      jobManager.printJobResponse();

      // Wait until the job is active to get more information
      Job job = jobManager.waitUntilActive();

      // Refresh the job manager with latest job view
      jobManager = jobManager.withJob(job);

      // Get the job info
      JobInfo jobInfo = getLaunchedJobInfo(jobManager, jobConfig, templateConfig);

      LOG.info("Waiting for the job to finish or timeout");
      DataflowJobManager.ExecutionStatus executionStatus =
          jobInfo.isBatchJob()
              ? jobManager.waitUntilDoneOrCancelAfterTimeOut()
              : jobManager.waitForDurationAndCancelJob();

      LOG.info("Completed waiting for the job");
      if (executionStatus == DataflowJobManager.ExecutionStatus.TIMEOUT) {
        ImmutableSet<JobState> terminalJobStates =
            ImmutableSet.<JobState>builder()
                .addAll(JobState.DONE_STATES)
                .addAll(JobState.FINISHING_STATES)
                .build();

        String expectedJobStates =
            terminalJobStates.stream().map(JobState::toString).collect(Collectors.joining(","));
        throw new RuntimeException(
            String.format("Job failed to reach one of the terminal states: %s", expectedJobStates));
      }

      LOG.info("Fetching job metrics..");
      MonitoringClient monitoringClient =
          MonitoringClient.builder()
              .setCredentials(
                  MetricsCollectorUtils.getCredentialsFromDataflowClient(this.dataflowClient))
              .build();

      MetricsManager metricsManager =
          MetricsManager.builder(this.dataflowClient, jobManager.job(), monitoringClient)
              .setMonitoringMetricsWaitDuration(Duration.ofMinutes(3))
              .build();

      Map<String, Double> metrics =
          metricsManager.getMetrics(jobInfo.jobType(), jobConfig.resourcePricing());

      LOG.info("Storing the results in the output store..");
      this.outputStore.load(jobInfo, metrics);

      LOG.info("Completed all operations");

    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }