Map aggregateMetricsFromResultSet()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java [152:213]


  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
    throws SQLException, IOException {
    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();

    TimelineMetric metric = null;
    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
    if (rs.next()) {
      metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
      while (metric == null && rs.next()) {
        metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
      }

      // Call slice after all rows for a host are read
      while (rs.next()) {
        TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
        // If rows belong to same host combine them before slicing. This
        // avoids issues across rows that belong to same hosts but get
        // counted as coming from different ones.
        if (nextMetric == null) {
          continue;
        }

        if (metric.equalsExceptTime(nextMetric)) {
          metric.addMetricValues(nextMetric.getMetricValues());
        } else {
          // Process the current metric
          int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
          if (!hostedAppCounter.containsKey(metric.getAppId())) {
            hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
          } else {
            int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
            if (currentHostCount < numHosts) {
              hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
            }
          }
          metric = nextMetric;
        }
      }
    }
    // Process last metric
    if (metric != null) {
      int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
      if (!hostedAppCounter.containsKey(metric.getAppId())) {
        hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
      } else {
        int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
        if (currentHostCount < numHosts) {
          hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
        }
      }
    }

    // Add app level aggregates to save
    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());

    // Add liveHosts per AppId metrics.
    long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
    processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);

    return aggregateClusterMetrics;
  }