in spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/Rollups.java [47:104]
static RollupPolicy fromRules(Map<String, String> commonTags, List<RollupPolicy.Rule> rules) {
QueryIndex<RollupPolicy.Rule> index = QueryIndex.newInstance(new NoopRegistry());
for (RollupPolicy.Rule rule : rules) {
// Apply common tags to simplify the query and avoid needing to merge with the ids
// before evaluating the query
Query query = Parser.parseQuery(rule.query()).simplify(commonTags);
index.add(query, rule);
}
return ms -> {
// Common tags -> aggregated measurements
Map<Map<String, String>, Map<Id, Aggregator>> aggregates = new HashMap<>();
for (Measurement m : ms) {
List<RollupPolicy.Rule> matches = index.findMatches(m.id());
if (matches.isEmpty()) {
// No matches for the id, but we still need to treat as an aggregate because
// rollup on another id could cause a collision
Map<Id, Aggregator> idMap = aggregates.computeIfAbsent(commonTags, k -> new HashMap<>());
updateAggregate(idMap, m.id(), m);
} else {
// Skip measurement if one of the rules indicates it should be dropped
if (shouldDrop(matches)) {
continue;
}
// For matching rules, find dimensions from common tags and others that are part
// of the id
Set<String> commonDimensions = new HashSet<>();
Set<String> otherDimensions = new HashSet<>();
for (RollupPolicy.Rule rule : matches) {
for (String dimension : rule.rollup()) {
if (commonTags.containsKey(dimension)) {
commonDimensions.add(dimension);
} else {
otherDimensions.add(dimension);
}
}
}
// Perform rollup by removing the dimensions
Map<String, String> tags = commonDimensions.isEmpty()
? commonTags
: rollup(commonTags, commonDimensions);
Id id = otherDimensions.isEmpty()
? m.id()
: m.id().filterByKey(k -> !otherDimensions.contains(k));
Map<Id, Aggregator> idMap = aggregates.computeIfAbsent(tags, k -> new HashMap<>());
updateAggregate(idMap, id, m);
}
}
// Convert to final result type
List<RollupPolicy.Result> results = new ArrayList<>(aggregates.size());
for (Map.Entry<Map<String, String>, Map<Id, Aggregator>> entry : aggregates.entrySet()) {
results.add(new RollupPolicy.Result(entry.getKey(), toMeasurements(entry.getValue())));
}
return results;
};
}