in rules/manager.go [629:764]
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal float64
for i, rule := range g.rules {
select {
case <-g.done:
return
default:
}
func(i int, rule Rule) {
ctx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name()))
defer func(t time.Time) {
sp.End()
since := time.Since(t)
g.metrics.EvalDuration.Observe(since.Seconds())
rule.SetEvaluationDuration(since)
rule.SetEvaluationTimestamp(t)
}(time.Now())
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
var eqc promql.ErrQueryCanceled
if !errors.As(err, &eqc) {
level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Evaluating rule failed", "rule", rule, "err", err)
}
return
}
rule.SetHealth(HealthGood)
rule.SetLastError(nil)
samplesTotal += float64(len(vector))
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
var (
numOutOfOrder = 0
numTooOld = 0
numDuplicates = 0
)
app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule sample appending failed", "err", err)
return
}
g.seriesInPreviousEval[i] = seriesReturned
}()
for _, s := range vector {
if s.H != nil {
_, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H)
} else {
_, err = app.Append(0, s.Metric, s.T, s.F)
}
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
switch {
case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample):
numOutOfOrder++
level.Debug(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule evaluation result discarded", "err", err, "sample", s)
case errors.Is(unwrappedErr, storage.ErrTooOldSample):
numTooOld++
level.Debug(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule evaluation result discarded", "err", err, "sample", s)
case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
numDuplicates++
level.Debug(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule evaluation result discarded", "err", err, "sample", s)
default:
level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
buf := [1024]byte{}
seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric
}
}
if numOutOfOrder > 0 {
level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
}
if numTooOld > 0 {
level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Error on ingesting too old result from rule evaluation", "numDropped", numTooOld)
}
if numDuplicates > 0 {
level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
}
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
switch {
case unwrappedErr == nil:
case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample),
errors.Is(unwrappedErr, storage.ErrTooOldSample),
errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("name", rule.Name(), "index", i, "msg", "Adding stale sample failed", "sample", lset.String(), "err", err)
}
}
}
}(i, rule)
}
if g.metrics != nil {
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
}
g.cleanupStaleSeries(ctx, ts)
}