in api/internal/core/store/store.go [358:392]
func (s *GenericStore) watch() context.CancelFunc {
c, cancel := context.WithCancel(context.TODO())
ch := s.Stg.Watch(c, s.opt.BasePath)
go func() {
defer func() {
if !s.closing {
log.Errorf("etcd watch exception closed, restarting: resource: %s", s.Type())
storeNeedReInit = append(storeNeedReInit, s)
}
}()
defer runtime.HandlePanic()
for event := range ch {
if event.Canceled {
log.Warnf("etcd watch failed: %s", event.Error)
return
}
for i := range event.Events {
switch event.Events[i].Type {
case storage.EventTypePut:
key := event.Events[i].Key[len(s.opt.BasePath)+1:]
objPtr, err := s.StringToObjPtr(event.Events[i].Value, key)
if err != nil {
log.Warnf("value convert to obj failed: %s", err)
continue
}
s.cache.Store(key, objPtr)
case storage.EventTypeDelete:
s.cache.Delete(event.Events[i].Key[len(s.opt.BasePath)+1:])
}
}
}
}()
return cancel
}