in spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java [379:422]
List<RollupPolicy.Result> getBatches(long t) {
final int n = atlasMeasurements.size();
final List<RollupPolicy.Result> batches = new ArrayList<>(n / batchSize + 1);
timePublishTask("getBatches", "atlasMeasurements", () -> {
debugRegistry.distributionSummary("spectator.registrySize").record(n);
List<Measurement> input = new ArrayList<>(n);
Iterator<Map.Entry<Id, Consolidator>> it = atlasMeasurements.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Id, Consolidator> entry = it.next();
Consolidator consolidator = entry.getValue();
// Ensure it has been updated for this interval
consolidator.update(t, Double.NaN);
// Add the measurement to the list
double v = consolidator.value(t);
if (!Double.isNaN(v)) {
input.add(new Measurement(entry.getKey(), t, v));
}
// Clean up if there is no longer a need to preserve the state for this id
if (consolidator.isEmpty()) {
it.remove();
}
}
List<RollupPolicy.Result> results = rollupPolicy.apply(input);
int rollupSize = results.stream().mapToInt(r -> r.measurements().size()).sum();
debugRegistry.distributionSummary("spectator.rollupResultSize").record(rollupSize);
// Rollup policy can result multiple sets of metrics with different common tags. Batches
// are computed using sets with the same common tags. This avoids needing to merge the
// common tags into the ids and the larger payloads that would result from replicating them
// on all measurements.
for (RollupPolicy.Result result : results) {
List<Measurement> ms = result.measurements();
for (int i = 0; i < ms.size(); i += batchSize) {
List<Measurement> batch = ms.subList(i, Math.min(ms.size(), i + batchSize));
batches.add(new RollupPolicy.Result(result.commonTags(), batch));
}
}
});
return batches;
}