func()

in promql/engine.go [2483:2814]


func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without bool, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) Vector {
	result := map[uint64]*groupedAggregation{}
	orderedResult := []*groupedAggregation{}
	var k int64
	if op == parser.TOPK || op == parser.BOTTOMK {
		f := param.(float64)
		if !convertibleToInt64(f) {
			ev.errorf("Scalar value %v overflows int64", f)
		}
		k = int64(f)
		if k < 1 {
			return Vector{}
		}
	}
	var q float64
	if op == parser.QUANTILE {
		q = param.(float64)
	}
	var valueLabel string
	var recomputeGroupingKey bool
	if op == parser.COUNT_VALUES {
		valueLabel = param.(string)
		if !model.LabelName(valueLabel).IsValid() {
			ev.errorf("invalid label name %q", valueLabel)
		}
		if !without {
			// We're changing the grouping labels so we have to ensure they're still sorted
			// and we have to flag to recompute the grouping key. Considering the count_values()
			// operator is less frequently used than other aggregations, we're fine having to
			// re-compute the grouping key on each step for this case.
			grouping = append(grouping, valueLabel)
			slices.Sort(grouping)
			recomputeGroupingKey = true
		}
	}

	var buf []byte
	for si, s := range vec {
		metric := s.Metric

		if op == parser.COUNT_VALUES {
			enh.resetBuilder(metric)
			enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64))
			metric = enh.lb.Labels()

			// We've changed the metric so we have to recompute the grouping key.
			recomputeGroupingKey = true
		}

		// We can use the pre-computed grouping key unless grouping labels have changed.
		var groupingKey uint64
		if !recomputeGroupingKey {
			groupingKey = seriesHelper[si].groupingKey
		} else {
			groupingKey, buf = generateGroupingKey(metric, grouping, without, buf)
		}

		group, ok := result[groupingKey]
		// Add a new group if it doesn't exist.
		if !ok {
			var m labels.Labels
			enh.resetBuilder(metric)
			switch {
			case without:
				enh.lb.Del(grouping...)
				enh.lb.Del(labels.MetricName)
				m = enh.lb.Labels()
			case len(grouping) > 0:
				enh.lb.Keep(grouping...)
				m = enh.lb.Labels()
			default:
				m = labels.EmptyLabels()
			}
			newAgg := &groupedAggregation{
				labels:     m,
				floatValue: s.F,
				floatMean:  s.F,
				groupCount: 1,
			}
			switch {
			case s.H == nil:
				newAgg.hasFloat = true
			case op == parser.SUM:
				newAgg.histogramValue = s.H.Copy()
				newAgg.hasHistogram = true
			case op == parser.AVG:
				newAgg.histogramMean = s.H.Copy()
				newAgg.hasHistogram = true
			case op == parser.STDVAR || op == parser.STDDEV:
				newAgg.groupCount = 0
			}

			result[groupingKey] = newAgg
			orderedResult = append(orderedResult, newAgg)

			inputVecLen := int64(len(vec))
			resultSize := k
			switch {
			case k > inputVecLen:
				resultSize = inputVecLen
			case k == 0:
				resultSize = 1
			}
			switch op {
			case parser.STDVAR, parser.STDDEV:
				result[groupingKey].floatValue = 0
			case parser.TOPK, parser.QUANTILE:
				result[groupingKey].heap = make(vectorByValueHeap, 1, resultSize)
				result[groupingKey].heap[0] = Sample{
					F:      s.F,
					Metric: s.Metric,
				}
			case parser.BOTTOMK:
				result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 1, resultSize)
				result[groupingKey].reverseHeap[0] = Sample{
					F:      s.F,
					Metric: s.Metric,
				}
			case parser.GROUP:
				result[groupingKey].floatValue = 1
			}
			continue
		}

		switch op {
		case parser.SUM:
			if s.H != nil {
				group.hasHistogram = true
				if group.histogramValue != nil {
					// The histogram being added must have
					// an equal or larger schema.
					if s.H.Schema >= group.histogramValue.Schema {
						group.histogramValue.Add(s.H)
					} else {
						group.histogramValue = s.H.Copy().Add(group.histogramValue)
					}
				}
				// Otherwise the aggregation contained floats
				// previously and will be invalid anyway. No
				// point in copying the histogram in that case.
			} else {
				group.hasFloat = true
				group.floatValue += s.F
			}

		case parser.AVG:
			group.groupCount++
			if s.H != nil {
				group.hasHistogram = true
				if group.histogramMean != nil {
					left := s.H.Copy().Div(float64(group.groupCount))
					right := group.histogramMean.Copy().Div(float64(group.groupCount))
					// The histogram being added/subtracted must have
					// an equal or larger schema.
					if s.H.Schema >= group.histogramMean.Schema {
						toAdd := right.Mul(-1).Add(left)
						group.histogramMean.Add(toAdd)
					} else {
						toAdd := left.Sub(right)
						group.histogramMean = toAdd.Add(group.histogramMean)
					}
				}
				// Otherwise the aggregation contained floats
				// previously and will be invalid anyway. No
				// point in copying the histogram in that case.
			} else {
				group.hasFloat = true
				if math.IsInf(group.floatMean, 0) {
					if math.IsInf(s.F, 0) && (group.floatMean > 0) == (s.F > 0) {
						// The `floatMean` and `s.F` values are `Inf` of the same sign.  They
						// can't be subtracted, but the value of `floatMean` is correct
						// already.
						break
					}
					if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) {
						// At this stage, the mean is an infinite. If the added
						// value is neither an Inf or a Nan, we can keep that mean
						// value.
						// This is required because our calculation below removes
						// the mean value, which would look like Inf += x - Inf and
						// end up as a NaN.
						break
					}
				}
				// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
				group.floatMean += s.F/float64(group.groupCount) - group.floatMean/float64(group.groupCount)
			}

		case parser.GROUP:
			// Do nothing. Required to avoid the panic in `default:` below.

		case parser.MAX:
			if group.floatValue < s.F || math.IsNaN(group.floatValue) {
				group.floatValue = s.F
			}

		case parser.MIN:
			if group.floatValue > s.F || math.IsNaN(group.floatValue) {
				group.floatValue = s.F
			}

		case parser.COUNT, parser.COUNT_VALUES:
			group.groupCount++

		case parser.STDVAR, parser.STDDEV:
			if s.H == nil { // Ignore native histograms.
				group.groupCount++
				delta := s.F - group.floatMean
				group.floatMean += delta / float64(group.groupCount)
				group.floatValue += delta * (s.F - group.floatMean)
			}

		case parser.TOPK:
			// We build a heap of up to k elements, with the smallest element at heap[0].
			switch {
			case int64(len(group.heap)) < k:
				heap.Push(&group.heap, &Sample{
					F:      s.F,
					Metric: s.Metric,
				})
			case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)):
				// This new element is bigger than the previous smallest element - overwrite that.
				group.heap[0] = Sample{
					F:      s.F,
					Metric: s.Metric,
				}
				if k > 1 {
					heap.Fix(&group.heap, 0) // Maintain the heap invariant.
				}
			}

		case parser.BOTTOMK:
			// We build a heap of up to k elements, with the biggest element at heap[0].
			switch {
			case int64(len(group.reverseHeap)) < k:
				heap.Push(&group.reverseHeap, &Sample{
					F:      s.F,
					Metric: s.Metric,
				})
			case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)):
				// This new element is smaller than the previous biggest element - overwrite that.
				group.reverseHeap[0] = Sample{
					F:      s.F,
					Metric: s.Metric,
				}
				if k > 1 {
					heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant.
				}
			}

		case parser.QUANTILE:
			group.heap = append(group.heap, s)

		default:
			panic(fmt.Errorf("expected aggregation operator but got %q", op))
		}
	}

	// Construct the result Vector from the aggregated groups.
	for _, aggr := range orderedResult {
		switch op {
		case parser.AVG:
			if aggr.hasFloat && aggr.hasHistogram {
				// We cannot aggregate histogram sample with a float64 sample.
				// TODO(zenador): Issue warning when plumbing is in place.
				continue
			}
			if aggr.hasHistogram {
				aggr.histogramValue = aggr.histogramMean.Compact(0)
			} else {
				aggr.floatValue = aggr.floatMean
			}

		case parser.COUNT, parser.COUNT_VALUES:
			aggr.floatValue = float64(aggr.groupCount)

		case parser.STDVAR:
			aggr.floatValue /= float64(aggr.groupCount)

		case parser.STDDEV:
			aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount))

		case parser.TOPK:
			// The heap keeps the lowest value on top, so reverse it.
			if len(aggr.heap) > 1 {
				sort.Sort(sort.Reverse(aggr.heap))
			}
			for _, v := range aggr.heap {
				enh.Out = append(enh.Out, Sample{
					Metric: v.Metric,
					F:      v.F,
				})
			}
			continue // Bypass default append.

		case parser.BOTTOMK:
			// The heap keeps the highest value on top, so reverse it.
			if len(aggr.reverseHeap) > 1 {
				sort.Sort(sort.Reverse(aggr.reverseHeap))
			}
			for _, v := range aggr.reverseHeap {
				enh.Out = append(enh.Out, Sample{
					Metric: v.Metric,
					F:      v.F,
				})
			}
			continue // Bypass default append.

		case parser.QUANTILE:
			aggr.floatValue = quantile(q, aggr.heap)

		case parser.SUM:
			if aggr.hasFloat && aggr.hasHistogram {
				// We cannot aggregate histogram sample with a float64 sample.
				// TODO(zenador): Issue warning when plumbing is in place.
				continue
			}
			if aggr.hasHistogram {
				aggr.histogramValue.Compact(0)
			}
		default:
			// For other aggregations, we already have the right value.
		}

		enh.Out = append(enh.Out, Sample{
			Metric: aggr.labels,
			F:      aggr.floatValue,
			H:      aggr.histogramValue,
		})
	}
	return enh.Out
}