in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java [212:261]
public void putMetrics(Collection<TimelineMetric> elements) {
Map<String, TimelineMetricHostMetadata> hostMetadata = metricMetadataManager.getHostedAppsCache();
for (TimelineMetric metric : elements) {
if (shouldBeSkipped(metric.getMetricName())) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Skipping %s metric from being aggregated", metric.getMetricName()));
}
continue;
}
List<Long[]> timeSlices = getTimeSlices(getRoundedCheckPointTimeMillis(metric.getMetricValues().firstKey(), cacheSliceIntervalMillis), metric.getMetricValues().lastKey(), cacheSliceIntervalMillis);
Map<TimelineClusterMetric, Double> slicedClusterMetrics = sliceFromTimelineMetric(metric, timeSlices, interpolationEnabled);
if (slicedClusterMetrics != null) {
for (Map.Entry<TimelineClusterMetric, Double> metricDoubleEntry : slicedClusterMetrics.entrySet()) {
MetricClusterAggregate newMetricClusterAggregate = new MetricClusterAggregate(
metricDoubleEntry.getValue(), 1, null, metricDoubleEntry.getValue(), metricDoubleEntry.getValue());
//put app metric into cache
putMetricIntoCache(metricDoubleEntry.getKey(), newMetricClusterAggregate);
if (hostMetadata != null) {
//calculate app host metric
if (metric.getAppId().equalsIgnoreCase(HOST_APP_ID)) {
// Candidate metric, update app aggregates
if (hostMetadata.containsKey(metric.getHostName())) {
updateAppAggregatesFromHostMetric(metricDoubleEntry.getKey(), newMetricClusterAggregate, hostMetadata.get(metric.getHostName()));
}
} else {
// Build the hostedapps map if not a host metric
// Check app candidacy for host aggregation
//TODO better to lock TimelineMetricHostMetadata instance to avoid dataloss, but generally the data could be lost only during initial collector start
if (appIdsToAggregate.contains(metric.getAppId())) {
TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(metric.getHostName());
ConcurrentHashMap<String, String> appIdsMap;
if (timelineMetricHostMetadata == null) {
appIdsMap = new ConcurrentHashMap<>();
hostMetadata.put(metric.getHostName(), new TimelineMetricHostMetadata(appIdsMap));
} else {
appIdsMap = timelineMetricHostMetadata.getHostedApps();
}
if (!appIdsMap.containsKey(metric.getAppId())) {
appIdsMap.put(metric.getAppId(), metric.getAppId());
LOG.info("Adding appId to hosted apps: appId = " +
metric.getAppId() + ", hostname = " + metric.getHostName());
}
}
}
}
}
}
}
}