in oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java [119:197]
public void analyse(final ImmutableMap<String, SampleFamily> sampleFamilies) {
Map<String, SampleFamily> input = samples.stream()
.map(s -> Tuple.of(s, sampleFamilies.get(s)))
.filter(t -> t._2 != null)
.collect(toImmutableMap(t -> t._1, t -> t._2));
if (input.size() < 1) {
if (log.isDebugEnabled()) {
log.debug("{} is ignored due to the lack of {}", expression, samples);
}
return;
}
if (filterExpression != null) {
input = filterExpression.filter(input);
}
Result r = expression.run(input);
if (!r.isSuccess()) {
return;
}
SampleFamily.RunningContext ctx = r.getData().context;
Map<MeterEntity, Sample[]> meterSamples = ctx.getMeterSamples();
meterSamples.forEach((meterEntity, ss) -> {
generateTraffic(meterEntity);
switch (metricType) {
case single:
AcceptableValue<Long> sv = meterSystem.buildMetrics(metricName, Long.class);
sv.accept(meterEntity, getValue(ss[0]));
send(sv, ss[0].getTimestamp());
break;
case labeled:
AcceptableValue<DataTable> lv = meterSystem.buildMetrics(metricName, DataTable.class);
DataTable dt = new DataTable();
// put all labels into the data table.
for (Sample each : ss) {
DataLabel dataLabel = new DataLabel();
dataLabel.putAll(each.getLabels());
dt.put(dataLabel, getValue(each));
}
lv.accept(meterEntity, dt);
send(lv, ss[0].getTimestamp());
break;
case histogram:
case histogramPercentile:
Stream.of(ss).map(s -> Tuple.of(getDataLabels(s.getLabels(), k -> !Objects.equals("le", k)), s))
.collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())))
.forEach((dataLabel, subSs) -> {
if (subSs.size() < 1) {
return;
}
long[] bb = new long[subSs.size()];
long[] vv = new long[bb.length];
for (int i = 0; i < subSs.size(); i++) {
Sample s = subSs.get(i);
final double leVal = Double.parseDouble(s.getLabels().get("le"));
if (leVal == Double.NEGATIVE_INFINITY) {
bb[i] = Long.MIN_VALUE;
} else {
bb[i] = (long) leVal;
}
vv[i] = getValue(s);
}
BucketedValues bv = new BucketedValues(bb, vv);
bv.setLabels(dataLabel);
long time = subSs.get(0).getTimestamp();
if (metricType == MetricType.histogram) {
AcceptableValue<BucketedValues> v = meterSystem.buildMetrics(
metricName, BucketedValues.class);
v.accept(meterEntity, bv);
send(v, time);
return;
}
AcceptableValue<PercentileArgument> v = meterSystem.buildMetrics(
metricName, PercentileArgument.class);
v.accept(meterEntity, new PercentileArgument(bv, percentiles));
send(v, time);
});
break;
}
});
}