func()

in datasource/etcd/sd/instance_event_handler.go [109:161]


func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
	defer log.Recover()
	t, n := time.NewTimer(deferCheckWindow), false
	defer t.Stop()
	for {
		select {
		case <-ctx.Done():
			log.Error("self preservation routine dead", nil)
			return
		case evts := <-iedh.evts:
			for _, evt := range evts {
				iedh.recoverOrDefer(evt)
			}

			del := len(iedh.items)
			if del == 0 {
				continue
			}

			if iedh.enabled {
				continue
			}

			total := iedh.cache.GetAll(nil)
			if total > selfPreservationInitCount && float64(del) >= float64(total)*iedh.Percent {
				iedh.enabled = true
				log.Warn(fmt.Sprintf("self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
					del, total, iedh.Percent*100))
			}

			if !n {
				timeutil.ResetTimer(t, deferCheckWindow)
				n = true
			}
		case <-t.C:
			n = false
			t.Reset(deferCheckWindow)

			if !iedh.enabled {
				for _, item := range iedh.items {
					iedh.replayEvent(item.event)
				}
				continue
			}

			iedh.ReplayEvents()
		case <-iedh.resetCh:
			iedh.ReplayEvents()
			iedh.enabled = false
			timeutil.ResetTimer(t, deferCheckWindow)
		}
	}
}