in promql/engine.go [1078:1266]
func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
matrixes := make([]Matrix, len(exprs))
origMatrixes := make([]Matrix, len(exprs))
originalNumSamples := ev.currentSamples
var warnings storage.Warnings
for i, e := range exprs {
// Functions will take string arguments from the expressions, not the values.
if e != nil && e.Type() != parser.ValueTypeString {
// ev.currentSamples will be updated to the correct value within the ev.eval call.
val, ws := ev.eval(e)
warnings = append(warnings, ws...)
matrixes[i] = val.(Matrix)
// Keep a copy of the original point slices so that they
// can be returned to the pool.
origMatrixes[i] = make(Matrix, len(matrixes[i]))
copy(origMatrixes[i], matrixes[i])
}
}
vectors := make([]Vector, len(exprs)) // Input vectors for the function.
args := make([]parser.Value, len(exprs)) // Argument to function.
// Create an output vector that is as big as the input matrix with
// the most time series.
biggestLen := 1
for i := range exprs {
vectors[i] = make(Vector, 0, len(matrixes[i]))
if len(matrixes[i]) > biggestLen {
biggestLen = len(matrixes[i])
}
}
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
var (
seriesHelpers [][]EvalSeriesHelper
bufHelpers [][]EvalSeriesHelper // Buffer updated on each step
)
// If the series preparation function is provided, we should run it for
// every single series in the matrix.
if prepSeries != nil {
seriesHelpers = make([][]EvalSeriesHelper, len(exprs))
bufHelpers = make([][]EvalSeriesHelper, len(exprs))
for i := range exprs {
seriesHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i]))
bufHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i]))
for si, series := range matrixes[i] {
h := seriesHelpers[i][si]
prepSeries(series.Metric, &h)
seriesHelpers[i][si] = h
}
}
}
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
}
// Reset number of samples in memory after each timestamp.
ev.currentSamples = tempNumSamples
// Gather input vectors for this timestamp.
for i := range exprs {
vectors[i] = vectors[i][:0]
if prepSeries != nil {
bufHelpers[i] = bufHelpers[i][:0]
}
for si, series := range matrixes[i] {
for _, point := range series.Floats {
if point.T == ts {
if ev.currentSamples < ev.maxSamples {
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: point.F, T: ts})
if prepSeries != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Floats = series.Floats[1:]
ev.currentSamples++
} else {
ev.error(ErrTooManySamples(env))
}
}
break
}
for _, point := range series.Histograms {
if point.T == ts {
if ev.currentSamples < ev.maxSamples {
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: point.H, T: ts})
if prepSeries != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Histograms = series.Histograms[1:]
ev.currentSamples++
} else {
ev.error(ErrTooManySamples(env))
}
}
break
}
}
args[i] = vectors[i]
ev.samplesStats.UpdatePeak(ev.currentSamples)
}
// Make the function call.
enh.Ts = ts
result, ws := funcCall(args, bufHelpers, enh)
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
enh.Out = result[:0] // Reuse result vector.
warnings = append(warnings, ws...)
ev.currentSamples += len(result)
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
// needs to include the samples from the result here, as they're still in memory.
tempNumSamples += len(result)
ev.samplesStats.UpdatePeak(ev.currentSamples)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
mat := make(Matrix, len(result))
for i, s := range result {
if s.H == nil {
mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}}
} else {
mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}}
}
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}
// Add samples in output vector to output series.
for _, sample := range result {
h := sample.Metric.Hash()
ss, ok := seriess[h]
if !ok {
ss = Series{Metric: sample.Metric}
}
if sample.H == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
}
ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F})
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H})
}
seriess[h] = ss
}
}
// Reuse the original point slices.
for _, m := range origMatrixes {
for _, s := range m {
putFPointSlice(s.Floats)
putHPointSlice(s.Histograms)
}
}
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make(Matrix, 0, len(seriess))
for _, ss := range seriess {
mat = append(mat, ss)
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}