public EvalPayload eval()

in spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java [175:238]


  public EvalPayload eval(long timestamp, boolean parallel) {
    Collection<EvalPayload.Metric> metrics = new ConcurrentLinkedQueue<>();
    StreamSupport.stream(subscriptions.values().spliterator(), parallel).forEach(subEntry -> {
      final String subId = subEntry.subscription.getId();
      final long step = subEntry.subscription.getFrequency();

      if (timestamp % step == 0) {
        LOGGER.debug("evaluating subscription: {}: {}", timestamp, subEntry.subscription);
        DataExpr expr = subEntry.subscription.dataExpr();
        final boolean delayGaugeAggr = delayGaugeAggregation && expr.isAccumulating();

        DataExpr.Aggregator aggregator = expr.aggregator(false);
        Iterator<Map.Entry<Id, Consolidator>> it = subEntry.measurements.entrySet().iterator();
        while (it.hasNext()) {
          Map.Entry<Id, Consolidator> entry = it.next();
          final Consolidator consolidator = entry.getValue();
          consolidator.update(timestamp, Double.NaN);
          final double v = consolidator.value(timestamp);
          if (!Double.isNaN(v)) {
            Map<String, String> tags =  null;
            if (expr instanceof DataExpr.GroupBy) {
              // Aggregation functions only use tags based on the expression. Avoid overhead of
              // considering the tags for the data.
              DataExpr.GroupBy by = (DataExpr.GroupBy) expr;
              Set<String> keys = by.keys();
              tags = idMapper.apply(entry.getKey(), keys);
              putCommonTags(tags, keys);
              if (tags.size() < keys.size()) {
                // When performing a group by, datapoints missing tag used for the grouping
                // should be ignored
                tags = null;
              } else {
                tags.putAll(by.aggregateFunction().queryTags());
              }
            } else if (expr instanceof DataExpr.AggregateFunction) {
              DataExpr.AggregateFunction af = (DataExpr.AggregateFunction) expr;
              tags = new HashMap<>(af.resultTags(af.queryTags()));
            }
            if (delayGaugeAggr && consolidator.isGauge()) {
              if (tags != null) {
                tags.put("atlas.aggr", idHash(entry.getKey()));
                double acc = expr.isCount() ? 1.0 : v;
                metrics.add(new EvalPayload.Metric(subId, tags, acc));
              }
            } else if (tags != null) {
              TagsValuePair p = new TagsValuePair(tags, v);
              aggregator.update(p);
              LOGGER.trace("aggregating: {}: {}", timestamp, p);
            }
          }
          if (consolidator.isEmpty()) {
            it.remove();
          }
        }

        for (TagsValuePair pair : aggregator.result()) {
          LOGGER.trace("result: {}: {}", timestamp, pair);
          metrics.add(new EvalPayload.Metric(subId, pair.tags(), pair.value()));
        }
      }
    });

    return new EvalPayload(timestamp, new ArrayList<>(metrics));
  }