in src/main/java/com/google/cloud/dfmetrics/pipelinemanager/MetricsManager.java [174:198]
Map<String, Double> filterAndParseMetrics(List<MetricUpdate> metrics) {
Map<String, Double> jobMetrics = new HashMap<>();
for (MetricUpdate metricUpdate : metrics) {
String metricName = metricUpdate.getName().getName();
// Handle only scalar metrics.
if (metricUpdate.getName().getOrigin().equals(DATAFLOW_SERVICEMETRICS_NAMESPACE)
&& isValidContext(metricUpdate.getName().getContext())
&& isValidNames(metricUpdate.getName())) {
if (metricUpdate.getScalar() != null) {
jobMetrics.put(metricName, ((Number) metricUpdate.getScalar()).doubleValue());
} else if (metricUpdate.getDistribution() != null) {
// currently, reporting distribution metrics as 4 separate scalar metrics
ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution();
jobMetrics.put(
metricName + "_COUNT", ((Number) distributionMap.get("count")).doubleValue());
jobMetrics.put(metricName + "_MIN", ((Number) distributionMap.get("min")).doubleValue());
jobMetrics.put(metricName + "_MAX", ((Number) distributionMap.get("max")).doubleValue());
jobMetrics.put(metricName + "_SUM", ((Number) distributionMap.get("sum")).doubleValue());
} else if (metricUpdate.getGauge() != null) {
LOG.warn("Gauge metric {} cannot be handled.", metricName);
}
}
}
return jobMetrics;
}