in dispatch/dispatch.go [147:198]
func (d *Dispatcher) run(it provider.AlertIterator) {
cleanup := time.NewTicker(30 * time.Second)
defer cleanup.Stop()
defer it.Close()
for {
select {
case alert, ok := <-it.Next():
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
}
return
}
level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)
// Log errors but keep trying.
if err := it.Err(); err != nil {
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
continue
}
now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
case <-cleanup.C:
d.mtx.Lock()
for _, groups := range d.aggrGroupsPerRoute {
for _, ag := range groups {
if ag.empty() {
ag.stop()
delete(groups, ag.fingerprint())
d.aggrGroupsNum--
d.metrics.aggrGroups.Dec()
}
}
}
d.mtx.Unlock()
case <-d.ctx.Done():
return
}
}
}