List getBatches()

in spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java [379:422]


  List<RollupPolicy.Result> getBatches(long t) {
    final int n = atlasMeasurements.size();
    final List<RollupPolicy.Result> batches = new ArrayList<>(n / batchSize + 1);
    timePublishTask("getBatches", "atlasMeasurements", () -> {
      debugRegistry.distributionSummary("spectator.registrySize").record(n);
      List<Measurement> input = new ArrayList<>(n);
      Iterator<Map.Entry<Id, Consolidator>> it = atlasMeasurements.entrySet().iterator();
      while (it.hasNext()) {
        Map.Entry<Id, Consolidator> entry = it.next();
        Consolidator consolidator = entry.getValue();

        // Ensure it has been updated for this interval
        consolidator.update(t, Double.NaN);

        // Add the measurement to the list
        double v = consolidator.value(t);
        if (!Double.isNaN(v)) {
          input.add(new Measurement(entry.getKey(), t, v));
        }

        // Clean up if there is no longer a need to preserve the state for this id
        if (consolidator.isEmpty()) {
          it.remove();
        }
      }

      List<RollupPolicy.Result> results = rollupPolicy.apply(input);
      int rollupSize = results.stream().mapToInt(r -> r.measurements().size()).sum();
      debugRegistry.distributionSummary("spectator.rollupResultSize").record(rollupSize);

      // Rollup policy can result multiple sets of metrics with different common tags. Batches
      // are computed using sets with the same common tags. This avoids needing to merge the
      // common tags into the ids and the larger payloads that would result from replicating them
      // on all measurements.
      for (RollupPolicy.Result result : results) {
        List<Measurement> ms = result.measurements();
        for (int i = 0; i < ms.size(); i += batchSize) {
          List<Measurement> batch = ms.subList(i, Math.min(ms.size(), i + batchSize));
          batches.add(new RollupPolicy.Result(result.commonTags(), batch));
        }
      }
    });
    return batches;
  }