in datasource/etcd/sd/instance_event_handler.go [109:161]
func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
defer log.Recover()
t, n := time.NewTimer(deferCheckWindow), false
defer t.Stop()
for {
select {
case <-ctx.Done():
log.Error("self preservation routine dead", nil)
return
case evts := <-iedh.evts:
for _, evt := range evts {
iedh.recoverOrDefer(evt)
}
del := len(iedh.items)
if del == 0 {
continue
}
if iedh.enabled {
continue
}
total := iedh.cache.GetAll(nil)
if total > selfPreservationInitCount && float64(del) >= float64(total)*iedh.Percent {
iedh.enabled = true
log.Warn(fmt.Sprintf("self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
del, total, iedh.Percent*100))
}
if !n {
timeutil.ResetTimer(t, deferCheckWindow)
n = true
}
case <-t.C:
n = false
t.Reset(deferCheckWindow)
if !iedh.enabled {
for _, item := range iedh.items {
iedh.replayEvent(item.event)
}
continue
}
iedh.ReplayEvents()
case <-iedh.resetCh:
iedh.ReplayEvents()
iedh.enabled = false
timeutil.ResetTimer(t, deferCheckWindow)
}
}
}