in datasource/etcd/state/etcd/cacher_kv.go [411:447]
func (c *KvCacher) buildCache(evts []kvstore.Event) {
init := !c.IsReady()
for i, evt := range evts {
key := util.BytesToStringWithNoCopy(evt.KV.Key)
prevKv := c.cache.Get(key)
ok := prevKv != nil
switch evt.Type {
case rmodel.EVT_CREATE, rmodel.EVT_UPDATE:
switch {
case init:
evt.Type = rmodel.EVT_INIT
case !ok && evt.Type != rmodel.EVT_CREATE:
log.Warn(fmt.Sprintf("unexpected %s event! it should be %s key %s",
evt.Type, rmodel.EVT_CREATE, key))
evt.Type = rmodel.EVT_CREATE
case ok && evt.Type != rmodel.EVT_UPDATE:
log.Warn(fmt.Sprintf("unexpected %s event! it should be %s key %s",
evt.Type, rmodel.EVT_UPDATE, key))
evt.Type = rmodel.EVT_UPDATE
}
c.cache.Put(key, evt.KV)
evts[i] = evt
case rmodel.EVT_DELETE:
if !ok {
log.Warn(fmt.Sprintf("unexpected %s event! key %s does not cache",
evt.Type, key))
} else {
evt.KV = prevKv
c.cache.Remove(key)
}
evts[i] = evt
}
}
kvstore.ReportProcessEventCompleted(c.Cfg.Key, evts)
}