func()

in rules/manager.go [629:764]


func (g *Group) Eval(ctx context.Context, ts time.Time) {
	var samplesTotal float64
	for i, rule := range g.rules {
		select {
		case <-g.done:
			return
		default:
		}

		func(i int, rule Rule) {
			ctx, sp := otel.Tracer("").Start(ctx, "rule")
			sp.SetAttributes(attribute.String("name", rule.Name()))
			defer func(t time.Time) {
				sp.End()

				since := time.Since(t)
				g.metrics.EvalDuration.Observe(since.Seconds())
				rule.SetEvaluationDuration(since)
				rule.SetEvaluationTimestamp(t)
			}(time.Now())

			g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

			vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
			if err != nil {
				rule.SetHealth(HealthBad)
				rule.SetLastError(err)
				sp.SetStatus(codes.Error, err.Error())
				g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

				// Canceled queries are intentional termination of queries. This normally
				// happens on shutdown and thus we skip logging of any errors here.
				var eqc promql.ErrQueryCanceled
				if !errors.As(err, &eqc) {
					level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Evaluating rule failed", "rule", rule, "err", err)
				}
				return
			}
			rule.SetHealth(HealthGood)
			rule.SetLastError(nil)
			samplesTotal += float64(len(vector))

			if ar, ok := rule.(*AlertingRule); ok {
				ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
			}
			var (
				numOutOfOrder = 0
				numTooOld     = 0
				numDuplicates = 0
			)

			app := g.opts.Appendable.Appender(ctx)
			seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
			defer func() {
				if err := app.Commit(); err != nil {
					rule.SetHealth(HealthBad)
					rule.SetLastError(err)
					sp.SetStatus(codes.Error, err.Error())
					g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

					level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule sample appending failed", "err", err)
					return
				}
				g.seriesInPreviousEval[i] = seriesReturned
			}()

			for _, s := range vector {
				if s.H != nil {
					_, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H)
				} else {
					_, err = app.Append(0, s.Metric, s.T, s.F)
				}

				if err != nil {
					rule.SetHealth(HealthBad)
					rule.SetLastError(err)
					sp.SetStatus(codes.Error, err.Error())
					unwrappedErr := errors.Unwrap(err)
					if unwrappedErr == nil {
						unwrappedErr = err
					}
					switch {
					case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample):
						numOutOfOrder++
						level.Debug(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule evaluation result discarded", "err", err, "sample", s)
					case errors.Is(unwrappedErr, storage.ErrTooOldSample):
						numTooOld++
						level.Debug(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule evaluation result discarded", "err", err, "sample", s)
					case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
						numDuplicates++
						level.Debug(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule evaluation result discarded", "err", err, "sample", s)
					default:
						level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule evaluation result discarded", "err", err, "sample", s)
					}
				} else {
					buf := [1024]byte{}
					seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric
				}
			}
			if numOutOfOrder > 0 {
				level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
			}
			if numTooOld > 0 {
				level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Error on ingesting too old result from rule evaluation", "numDropped", numTooOld)
			}
			if numDuplicates > 0 {
				level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
			}

			for metric, lset := range g.seriesInPreviousEval[i] {
				if _, ok := seriesReturned[metric]; !ok {
					// Series no longer exposed, mark it stale.
					_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
					unwrappedErr := errors.Unwrap(err)
					if unwrappedErr == nil {
						unwrappedErr = err
					}
					switch {
					case unwrappedErr == nil:
					case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample),
						errors.Is(unwrappedErr, storage.ErrTooOldSample),
						errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
						// Do not count these in logging, as this is expected if series
						// is exposed from a different rule.
					default:
						level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Adding stale sample failed", "sample", lset.String(), "err", err)
					}
				}
			}
		}(i, rule)
	}
	if g.metrics != nil {
		g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
	}
	g.cleanupStaleSeries(ctx, ts)
}