in src/main/java/com/google/cloud/dfmetrics/pipelinemanager/MetricsManager.java [223:285]
public void addComputedMetrics(
Map<String, Double> metrics, String jobType, @Nullable ResourcePricing resourcePricing)
throws IOException {
Optional<TimeInterval> maybeWorkerTimerInterval;
TimeInterval jobTimerInterval = null, workerTimerInterval = null, elapsedJobTimeInterval = null;
try {
maybeWorkerTimerInterval = getWorkerTimeInterval();
jobTimerInterval = getJobTimeInterval();
elapsedJobTimeInterval = getElapsedJobTimeInterval();
} catch (ParseException e) {
throw new RuntimeException(e);
}
Optional<Double> elapsedTime =
monitoringClient()
.getJobElapsedTime(job().getProjectId(), job().getId(), elapsedJobTimeInterval);
elapsedTime.ifPresent(elapsedTimeInsec -> metrics.put("TotalElapsedTimeSec", elapsedTimeInsec));
if (maybeWorkerTimerInterval.isPresent()) {
workerTimerInterval = maybeWorkerTimerInterval.get();
metrics.put(
"TotalRunTimeSec",
(double)
Timestamps.between(
workerTimerInterval.getStartTime(), workerTimerInterval.getEndTime())
.getSeconds());
} else {
workerTimerInterval = jobTimerInterval;
}
if (resourcePricing != null) {
LOG.info("Estimating job cost..");
double estimatedJobCost = resourcePricing.estimateJobCost(jobType, metrics);
metrics.put("EstimatedJobCost", estimatedJobCost);
}
Optional<List<Double>> cpuUtilizationValues =
monitoringClient()
.getCpuUtilization(job().getProjectId(), job().getId(), workerTimerInterval);
if (cpuUtilizationValues.isPresent()) {
metrics.put("AvgCpuUtilization%", calculateAverage(cpuUtilizationValues.get()) * 100);
metrics.put("MaxCpuUtilization%", calculateMaximum(cpuUtilizationValues.get()) * 100);
}
if (jobType.equals("JOB_TYPE_STREAMING")) {
Optional<List<Double>> dataFreshnessValues =
monitoringClient()
.getDataFreshness(job().getProjectId(), job().getId(), workerTimerInterval);
if (dataFreshnessValues.isPresent()) {
metrics.put("AvgDataFreshnessSec", calculateAverage(dataFreshnessValues.get()));
metrics.put("MaxDataFreshnessSec", calculateMaximum(dataFreshnessValues.get()));
}
Optional<List<Double>> systemLatencyValues =
monitoringClient()
.getSystemLatency(job().getProjectId(), job().getId(), workerTimerInterval);
if (systemLatencyValues.isPresent()) {
metrics.put("AvgSystemLatencySec", calculateAverage(systemLatencyValues.get()));
metrics.put("MaxSystemLatencySec", calculateMaximum(systemLatencyValues.get()));
}
}
}