func()

in api/internal/core/store/store.go [358:392]


func (s *GenericStore) watch() context.CancelFunc {
	c, cancel := context.WithCancel(context.TODO())
	ch := s.Stg.Watch(c, s.opt.BasePath)
	go func() {
		defer func() {
			if !s.closing {
				log.Errorf("etcd watch exception closed, restarting: resource: %s", s.Type())
				storeNeedReInit = append(storeNeedReInit, s)
			}
		}()
		defer runtime.HandlePanic()
		for event := range ch {
			if event.Canceled {
				log.Warnf("etcd watch failed: %s", event.Error)
				return
			}

			for i := range event.Events {
				switch event.Events[i].Type {
				case storage.EventTypePut:
					key := event.Events[i].Key[len(s.opt.BasePath)+1:]
					objPtr, err := s.StringToObjPtr(event.Events[i].Value, key)
					if err != nil {
						log.Warnf("value convert to obj failed: %s", err)
						continue
					}
					s.cache.Store(key, objPtr)
				case storage.EventTypeDelete:
					s.cache.Delete(event.Events[i].Key[len(s.opt.BasePath)+1:])
				}
			}
		}
	}()
	return cancel
}