in datasource/etcd/state/etcd/cacher_kv.go [303:347]
func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*sdcommon.Resource,
rev int64, eventsCh chan [sdcommon.EventBlockSize]kvstore.Event, filterStopCh chan struct{}) {
var block [sdcommon.EventBlockSize]kvstore.Event
i := 0
for k, v := range newStore {
ov := c.cache.Get(k)
if ov == nil {
if i >= sdcommon.EventBlockSize {
eventsCh <- block
block = [sdcommon.EventBlockSize]kvstore.Event{}
i = 0
}
if kv := c.doParse(v); kv != nil {
block[i] = kvstore.NewEvent(rmodel.EVT_CREATE, kv, rev)
i++
}
continue
}
if ov.CreateRevision == v.CreateRevision && ov.ModRevision == v.ModRevision {
continue
}
if i >= sdcommon.EventBlockSize {
eventsCh <- block
block = [sdcommon.EventBlockSize]kvstore.Event{}
i = 0
}
if kv := c.doParse(v); kv != nil {
block[i] = kvstore.NewEvent(rmodel.EVT_UPDATE, kv, rev)
i++
}
}
if i > 0 {
eventsCh <- block
}
<-filterStopCh
close(eventsCh)
}