in atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala [933:1026]
private def estimatePercentiles(
context: EvalContext,
baseLabel: String,
data: List[TimeSeries]
): List[TimeSeries] = {
// If the mapping on top of the data layer puts in a "no data" time series as a
// placeholder, then there will be entries without the percentile tag. Ideally
// it would be fixed at the data layer, but this check provides a better user
// experience otherwise as it will not fail all together.
val filtered = data.filter(_.tags.contains(TagKey.percentile))
if (filtered.isEmpty) {
List(TimeSeries.noData(context.step))
} else {
val length = ((context.end - context.start) / context.step).toInt
// Output time sequences, one for each output percentile we need to estimate
val output = Array.fill[ArrayTimeSeq](pcts.length) {
val buf = ArrayHelper.fill(length, Double.NaN)
new ArrayTimeSeq(DsType.Gauge, context.start, context.step, buf)
}
// Count for each bucket
val counts = new Array[Double](PercentileBuckets.length())
val byBucket = filtered.groupBy { t =>
// Value should have a prefix of T or D, followed by 4 digit hex integer indicating the
// bucket index
val idx = t.tags(TagKey.percentile).substring(1)
Integer.parseInt(idx, 16)
}
// Counts that are actually present in the input
val usedCounts = byBucket.keys.toArray
java.util.Arrays.sort(usedCounts)
// Input sequences
val bounded = new Array[ArrayTimeSeq](usedCounts.length)
var i = 0
while (i < usedCounts.length) {
val vs = byBucket(usedCounts(i))
require(
vs.lengthCompare(1) == 0,
s"invalid percentile encoding: [${vs.map(_.tags(TagKey.percentile)).mkString(",")}]"
)
bounded(i) = vs.head.data.bounded(context.start, context.end)
i += 1
}
// Array percentile results will get written to
val results = new Array[Double](pcts.length)
// If the input was a timer the unit for the buckets is nanoseconds. The type is reflected
// by the prefix of T on the bucket key. After estimating the value we multiply by 1e-9 to
// keep the result in a base unit of seconds.
val isTimer = filtered.head.tags(TagKey.percentile).startsWith("T")
val cnvFactor = if (isTimer) 1e-9 else 1.0
// Loop across each time interval. This section is the tight loop so we keep it as simple
// array accesses and basic loops to minimize performance overhead.
i = 0
while (i < length) {
// Fill in the counts for this interval and compute the estimate
var j = 0
while (j < usedCounts.length) {
val v = bounded(j).data(i)
counts(usedCounts(j)) = if (v.isFinite) v else 0.0
j += 1
}
PercentileBuckets.percentiles(counts, pcts, results)
// Fill in the output sequences with the results
j = 0
while (j < results.length) {
output(j).data(i) = results(j) * cnvFactor
j += 1
}
i += 1
}
// Apply the tags and labels to the output. The percentile values are padded with a
// space so that the decimal place will line up vertically when using a monospace font
// for rendering.
output.toList.zipWithIndex.map {
case (seq, j) =>
val p = pcts(j) match {
case v if v < 10.0 => s" $v"
case v if v < 100.0 => s" $v"
case v => v.toString
}
val tags = data.head.tags + (TagKey.percentile -> p)
TimeSeries(tags, f"percentile($baseLabel, $p)", seq)
}
}
}