Map filterAndParseMetrics()

in src/main/java/com/google/cloud/dfmetrics/pipelinemanager/MetricsManager.java [174:198]


  Map<String, Double> filterAndParseMetrics(List<MetricUpdate> metrics) {
    Map<String, Double> jobMetrics = new HashMap<>();
    for (MetricUpdate metricUpdate : metrics) {
      String metricName = metricUpdate.getName().getName();
      // Handle only scalar metrics.
      if (metricUpdate.getName().getOrigin().equals(DATAFLOW_SERVICEMETRICS_NAMESPACE)
          && isValidContext(metricUpdate.getName().getContext())
          && isValidNames(metricUpdate.getName())) {
        if (metricUpdate.getScalar() != null) {
          jobMetrics.put(metricName, ((Number) metricUpdate.getScalar()).doubleValue());
        } else if (metricUpdate.getDistribution() != null) {
          // currently, reporting distribution metrics as 4 separate scalar metrics
          ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution();
          jobMetrics.put(
              metricName + "_COUNT", ((Number) distributionMap.get("count")).doubleValue());
          jobMetrics.put(metricName + "_MIN", ((Number) distributionMap.get("min")).doubleValue());
          jobMetrics.put(metricName + "_MAX", ((Number) distributionMap.get("max")).doubleValue());
          jobMetrics.put(metricName + "_SUM", ((Number) distributionMap.get("sum")).doubleValue());
        } else if (metricUpdate.getGauge() != null) {
          LOG.warn("Gauge metric {} cannot be handled.", metricName);
        }
      }
    }
    return jobMetrics;
  }