in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java [152:213]
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
throws SQLException, IOException {
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
TimelineMetric metric = null;
Map<String, MutableInt> hostedAppCounter = new HashMap<>();
if (rs.next()) {
metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
while (metric == null && rs.next()) {
metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
}
// Call slice after all rows for a host are read
while (rs.next()) {
TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
// If rows belong to same host combine them before slicing. This
// avoids issues across rows that belong to same hosts but get
// counted as coming from different ones.
if (nextMetric == null) {
continue;
}
if (metric.equalsExceptTime(nextMetric)) {
metric.addMetricValues(nextMetric.getMetricValues());
} else {
// Process the current metric
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
if (!hostedAppCounter.containsKey(metric.getAppId())) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
} else {
int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
if (currentHostCount < numHosts) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
}
}
metric = nextMetric;
}
}
}
// Process last metric
if (metric != null) {
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
if (!hostedAppCounter.containsKey(metric.getAppId())) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
} else {
int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
if (currentHostCount < numHosts) {
hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
}
}
}
// Add app level aggregates to save
aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
// Add liveHosts per AppId metrics.
long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);
return aggregateClusterMetrics;
}