public void addComputedMetrics()

in src/main/java/com/google/cloud/dfmetrics/pipelinemanager/MetricsManager.java [223:285]


  public void addComputedMetrics(
      Map<String, Double> metrics, String jobType, @Nullable ResourcePricing resourcePricing)
      throws IOException {
    Optional<TimeInterval> maybeWorkerTimerInterval;
    TimeInterval jobTimerInterval = null, workerTimerInterval = null, elapsedJobTimeInterval = null;
    try {
      maybeWorkerTimerInterval = getWorkerTimeInterval();
      jobTimerInterval = getJobTimeInterval();
      elapsedJobTimeInterval = getElapsedJobTimeInterval();
    } catch (ParseException e) {
      throw new RuntimeException(e);
    }

    Optional<Double> elapsedTime =
        monitoringClient()
            .getJobElapsedTime(job().getProjectId(), job().getId(), elapsedJobTimeInterval);

    elapsedTime.ifPresent(elapsedTimeInsec -> metrics.put("TotalElapsedTimeSec", elapsedTimeInsec));

    if (maybeWorkerTimerInterval.isPresent()) {
      workerTimerInterval = maybeWorkerTimerInterval.get();
      metrics.put(
          "TotalRunTimeSec",
          (double)
              Timestamps.between(
                      workerTimerInterval.getStartTime(), workerTimerInterval.getEndTime())
                  .getSeconds());
    } else {
      workerTimerInterval = jobTimerInterval;
    }

    if (resourcePricing != null) {
      LOG.info("Estimating job cost..");
      double estimatedJobCost = resourcePricing.estimateJobCost(jobType, metrics);
      metrics.put("EstimatedJobCost", estimatedJobCost);
    }

    Optional<List<Double>> cpuUtilizationValues =
        monitoringClient()
            .getCpuUtilization(job().getProjectId(), job().getId(), workerTimerInterval);
    if (cpuUtilizationValues.isPresent()) {
      metrics.put("AvgCpuUtilization%", calculateAverage(cpuUtilizationValues.get()) * 100);
      metrics.put("MaxCpuUtilization%", calculateMaximum(cpuUtilizationValues.get()) * 100);
    }

    if (jobType.equals("JOB_TYPE_STREAMING")) {
      Optional<List<Double>> dataFreshnessValues =
          monitoringClient()
              .getDataFreshness(job().getProjectId(), job().getId(), workerTimerInterval);
      if (dataFreshnessValues.isPresent()) {
        metrics.put("AvgDataFreshnessSec", calculateAverage(dataFreshnessValues.get()));
        metrics.put("MaxDataFreshnessSec", calculateMaximum(dataFreshnessValues.get()));
      }

      Optional<List<Double>> systemLatencyValues =
          monitoringClient()
              .getSystemLatency(job().getProjectId(), job().getId(), workerTimerInterval);
      if (systemLatencyValues.isPresent()) {
        metrics.put("AvgSystemLatencySec", calculateAverage(systemLatencyValues.get()));
        metrics.put("MaxSystemLatencySec", calculateMaximum(systemLatencyValues.get()));
      }
    }
  }