in spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java [175:238]
public EvalPayload eval(long timestamp, boolean parallel) {
Collection<EvalPayload.Metric> metrics = new ConcurrentLinkedQueue<>();
StreamSupport.stream(subscriptions.values().spliterator(), parallel).forEach(subEntry -> {
final String subId = subEntry.subscription.getId();
final long step = subEntry.subscription.getFrequency();
if (timestamp % step == 0) {
LOGGER.debug("evaluating subscription: {}: {}", timestamp, subEntry.subscription);
DataExpr expr = subEntry.subscription.dataExpr();
final boolean delayGaugeAggr = delayGaugeAggregation && expr.isAccumulating();
DataExpr.Aggregator aggregator = expr.aggregator(false);
Iterator<Map.Entry<Id, Consolidator>> it = subEntry.measurements.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Id, Consolidator> entry = it.next();
final Consolidator consolidator = entry.getValue();
consolidator.update(timestamp, Double.NaN);
final double v = consolidator.value(timestamp);
if (!Double.isNaN(v)) {
Map<String, String> tags = null;
if (expr instanceof DataExpr.GroupBy) {
// Aggregation functions only use tags based on the expression. Avoid overhead of
// considering the tags for the data.
DataExpr.GroupBy by = (DataExpr.GroupBy) expr;
Set<String> keys = by.keys();
tags = idMapper.apply(entry.getKey(), keys);
putCommonTags(tags, keys);
if (tags.size() < keys.size()) {
// When performing a group by, datapoints missing tag used for the grouping
// should be ignored
tags = null;
} else {
tags.putAll(by.aggregateFunction().queryTags());
}
} else if (expr instanceof DataExpr.AggregateFunction) {
DataExpr.AggregateFunction af = (DataExpr.AggregateFunction) expr;
tags = new HashMap<>(af.resultTags(af.queryTags()));
}
if (delayGaugeAggr && consolidator.isGauge()) {
if (tags != null) {
tags.put("atlas.aggr", idHash(entry.getKey()));
double acc = expr.isCount() ? 1.0 : v;
metrics.add(new EvalPayload.Metric(subId, tags, acc));
}
} else if (tags != null) {
TagsValuePair p = new TagsValuePair(tags, v);
aggregator.update(p);
LOGGER.trace("aggregating: {}: {}", timestamp, p);
}
}
if (consolidator.isEmpty()) {
it.remove();
}
}
for (TagsValuePair pair : aggregator.result()) {
LOGGER.trace("result: {}: {}", timestamp, pair);
metrics.add(new EvalPayload.Metric(subId, pair.tags(), pair.value()));
}
}
});
return new EvalPayload(timestamp, new ArrayList<>(metrics));
}