in datasource/etcd/state/etcd/cacher_kv.go [159:199]
func (c *KvCacher) handleEventBus(eventBus *sdcommon.EventBus) error {
defer eventBus.Stop()
for resp := range eventBus.ResourceEventBus() {
if resp == nil {
return errors.New("handle watcher error")
}
start := time.Now()
rev := resp.Revision
evts := make([]kvstore.Event, 0, len(resp.Resources))
for _, resource := range resp.Resources {
evt := kvstore.NewEvent(rmodel.EVT_CREATE, nil, resource.ModRevision)
switch {
case resp.Action == sdcommon.ActionPUT && resource.Version == 1:
evt.Type, evt.KV = rmodel.EVT_CREATE, c.doParse(resource)
case resp.Action == sdcommon.ActionPUT:
evt.Type, evt.KV = rmodel.EVT_UPDATE, c.doParse(resource)
case resp.Action == sdcommon.ActionDelete:
evt.Type = rmodel.EVT_DELETE
if resource.Value == nil {
// it will happen in embed mode, and then need to get the cache value not unmarshal
evt.KV = c.cache.Get(resource.Key)
} else {
evt.KV = c.doParse(resource)
}
default:
log.Error(fmt.Sprintf("unknown KeyValue %v", resource), nil)
continue
}
if evt.KV == nil {
log.Error(fmt.Sprintf("failed to parse KeyValue %v", resource), nil)
continue
}
evts = append(evts, evt)
}
c.sync(evts)
log.DebugOrWarn(start, fmt.Sprintf("finish to handle %d events, prefix: %s, rev: %d",
len(evts), c.Cfg.Key, rev))
}
return nil
}