in promql/engine.go [2483:2814]
func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without bool, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) Vector {
result := map[uint64]*groupedAggregation{}
orderedResult := []*groupedAggregation{}
var k int64
if op == parser.TOPK || op == parser.BOTTOMK {
f := param.(float64)
if !convertibleToInt64(f) {
ev.errorf("Scalar value %v overflows int64", f)
}
k = int64(f)
if k < 1 {
return Vector{}
}
}
var q float64
if op == parser.QUANTILE {
q = param.(float64)
}
var valueLabel string
var recomputeGroupingKey bool
if op == parser.COUNT_VALUES {
valueLabel = param.(string)
if !model.LabelName(valueLabel).IsValid() {
ev.errorf("invalid label name %q", valueLabel)
}
if !without {
// We're changing the grouping labels so we have to ensure they're still sorted
// and we have to flag to recompute the grouping key. Considering the count_values()
// operator is less frequently used than other aggregations, we're fine having to
// re-compute the grouping key on each step for this case.
grouping = append(grouping, valueLabel)
slices.Sort(grouping)
recomputeGroupingKey = true
}
}
var buf []byte
for si, s := range vec {
metric := s.Metric
if op == parser.COUNT_VALUES {
enh.resetBuilder(metric)
enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64))
metric = enh.lb.Labels()
// We've changed the metric so we have to recompute the grouping key.
recomputeGroupingKey = true
}
// We can use the pre-computed grouping key unless grouping labels have changed.
var groupingKey uint64
if !recomputeGroupingKey {
groupingKey = seriesHelper[si].groupingKey
} else {
groupingKey, buf = generateGroupingKey(metric, grouping, without, buf)
}
group, ok := result[groupingKey]
// Add a new group if it doesn't exist.
if !ok {
var m labels.Labels
enh.resetBuilder(metric)
switch {
case without:
enh.lb.Del(grouping...)
enh.lb.Del(labels.MetricName)
m = enh.lb.Labels()
case len(grouping) > 0:
enh.lb.Keep(grouping...)
m = enh.lb.Labels()
default:
m = labels.EmptyLabels()
}
newAgg := &groupedAggregation{
labels: m,
floatValue: s.F,
floatMean: s.F,
groupCount: 1,
}
switch {
case s.H == nil:
newAgg.hasFloat = true
case op == parser.SUM:
newAgg.histogramValue = s.H.Copy()
newAgg.hasHistogram = true
case op == parser.AVG:
newAgg.histogramMean = s.H.Copy()
newAgg.hasHistogram = true
case op == parser.STDVAR || op == parser.STDDEV:
newAgg.groupCount = 0
}
result[groupingKey] = newAgg
orderedResult = append(orderedResult, newAgg)
inputVecLen := int64(len(vec))
resultSize := k
switch {
case k > inputVecLen:
resultSize = inputVecLen
case k == 0:
resultSize = 1
}
switch op {
case parser.STDVAR, parser.STDDEV:
result[groupingKey].floatValue = 0
case parser.TOPK, parser.QUANTILE:
result[groupingKey].heap = make(vectorByValueHeap, 1, resultSize)
result[groupingKey].heap[0] = Sample{
F: s.F,
Metric: s.Metric,
}
case parser.BOTTOMK:
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 1, resultSize)
result[groupingKey].reverseHeap[0] = Sample{
F: s.F,
Metric: s.Metric,
}
case parser.GROUP:
result[groupingKey].floatValue = 1
}
continue
}
switch op {
case parser.SUM:
if s.H != nil {
group.hasHistogram = true
if group.histogramValue != nil {
// The histogram being added must have
// an equal or larger schema.
if s.H.Schema >= group.histogramValue.Schema {
group.histogramValue.Add(s.H)
} else {
group.histogramValue = s.H.Copy().Add(group.histogramValue)
}
}
// Otherwise the aggregation contained floats
// previously and will be invalid anyway. No
// point in copying the histogram in that case.
} else {
group.hasFloat = true
group.floatValue += s.F
}
case parser.AVG:
group.groupCount++
if s.H != nil {
group.hasHistogram = true
if group.histogramMean != nil {
left := s.H.Copy().Div(float64(group.groupCount))
right := group.histogramMean.Copy().Div(float64(group.groupCount))
// The histogram being added/subtracted must have
// an equal or larger schema.
if s.H.Schema >= group.histogramMean.Schema {
toAdd := right.Mul(-1).Add(left)
group.histogramMean.Add(toAdd)
} else {
toAdd := left.Sub(right)
group.histogramMean = toAdd.Add(group.histogramMean)
}
}
// Otherwise the aggregation contained floats
// previously and will be invalid anyway. No
// point in copying the histogram in that case.
} else {
group.hasFloat = true
if math.IsInf(group.floatMean, 0) {
if math.IsInf(s.F, 0) && (group.floatMean > 0) == (s.F > 0) {
// The `floatMean` and `s.F` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `floatMean` is correct
// already.
break
}
if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) {
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
// This is required because our calculation below removes
// the mean value, which would look like Inf += x - Inf and
// end up as a NaN.
break
}
}
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
group.floatMean += s.F/float64(group.groupCount) - group.floatMean/float64(group.groupCount)
}
case parser.GROUP:
// Do nothing. Required to avoid the panic in `default:` below.
case parser.MAX:
if group.floatValue < s.F || math.IsNaN(group.floatValue) {
group.floatValue = s.F
}
case parser.MIN:
if group.floatValue > s.F || math.IsNaN(group.floatValue) {
group.floatValue = s.F
}
case parser.COUNT, parser.COUNT_VALUES:
group.groupCount++
case parser.STDVAR, parser.STDDEV:
if s.H == nil { // Ignore native histograms.
group.groupCount++
delta := s.F - group.floatMean
group.floatMean += delta / float64(group.groupCount)
group.floatValue += delta * (s.F - group.floatMean)
}
case parser.TOPK:
// We build a heap of up to k elements, with the smallest element at heap[0].
switch {
case int64(len(group.heap)) < k:
heap.Push(&group.heap, &Sample{
F: s.F,
Metric: s.Metric,
})
case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
// This new element is bigger than the previous smallest element - overwrite that.
group.heap[0] = Sample{
F: s.F,
Metric: s.Metric,
}
if k > 1 {
heap.Fix(&group.heap, 0) // Maintain the heap invariant.
}
}
case parser.BOTTOMK:
// We build a heap of up to k elements, with the biggest element at heap[0].
switch {
case int64(len(group.reverseHeap)) < k:
heap.Push(&group.reverseHeap, &Sample{
F: s.F,
Metric: s.Metric,
})
case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)):
// This new element is smaller than the previous biggest element - overwrite that.
group.reverseHeap[0] = Sample{
F: s.F,
Metric: s.Metric,
}
if k > 1 {
heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant.
}
}
case parser.QUANTILE:
group.heap = append(group.heap, s)
default:
panic(fmt.Errorf("expected aggregation operator but got %q", op))
}
}
// Construct the result Vector from the aggregated groups.
for _, aggr := range orderedResult {
switch op {
case parser.AVG:
if aggr.hasFloat && aggr.hasHistogram {
// We cannot aggregate histogram sample with a float64 sample.
// TODO(zenador): Issue warning when plumbing is in place.
continue
}
if aggr.hasHistogram {
aggr.histogramValue = aggr.histogramMean.Compact(0)
} else {
aggr.floatValue = aggr.floatMean
}
case parser.COUNT, parser.COUNT_VALUES:
aggr.floatValue = float64(aggr.groupCount)
case parser.STDVAR:
aggr.floatValue /= float64(aggr.groupCount)
case parser.STDDEV:
aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount))
case parser.TOPK:
// The heap keeps the lowest value on top, so reverse it.
if len(aggr.heap) > 1 {
sort.Sort(sort.Reverse(aggr.heap))
}
for _, v := range aggr.heap {
enh.Out = append(enh.Out, Sample{
Metric: v.Metric,
F: v.F,
})
}
continue // Bypass default append.
case parser.BOTTOMK:
// The heap keeps the highest value on top, so reverse it.
if len(aggr.reverseHeap) > 1 {
sort.Sort(sort.Reverse(aggr.reverseHeap))
}
for _, v := range aggr.reverseHeap {
enh.Out = append(enh.Out, Sample{
Metric: v.Metric,
F: v.F,
})
}
continue // Bypass default append.
case parser.QUANTILE:
aggr.floatValue = quantile(q, aggr.heap)
case parser.SUM:
if aggr.hasFloat && aggr.hasHistogram {
// We cannot aggregate histogram sample with a float64 sample.
// TODO(zenador): Issue warning when plumbing is in place.
continue
}
if aggr.hasHistogram {
aggr.histogramValue.Compact(0)
}
default:
// For other aggregations, we already have the right value.
}
enh.Out = append(enh.Out, Sample{
Metric: aggr.labels,
F: aggr.floatValue,
H: aggr.histogramValue,
})
}
return enh.Out
}