func()

in datasource/etcd/state/etcd/cacher_kv.go [303:347]


func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*sdcommon.Resource,
	rev int64, eventsCh chan [sdcommon.EventBlockSize]kvstore.Event, filterStopCh chan struct{}) {
	var block [sdcommon.EventBlockSize]kvstore.Event
	i := 0

	for k, v := range newStore {
		ov := c.cache.Get(k)
		if ov == nil {
			if i >= sdcommon.EventBlockSize {
				eventsCh <- block
				block = [sdcommon.EventBlockSize]kvstore.Event{}
				i = 0
			}

			if kv := c.doParse(v); kv != nil {
				block[i] = kvstore.NewEvent(rmodel.EVT_CREATE, kv, rev)
				i++
			}
			continue
		}

		if ov.CreateRevision == v.CreateRevision && ov.ModRevision == v.ModRevision {
			continue
		}

		if i >= sdcommon.EventBlockSize {
			eventsCh <- block
			block = [sdcommon.EventBlockSize]kvstore.Event{}
			i = 0
		}

		if kv := c.doParse(v); kv != nil {
			block[i] = kvstore.NewEvent(rmodel.EVT_UPDATE, kv, rev)
			i++
		}
	}

	if i > 0 {
		eventsCh <- block
	}

	<-filterStopCh

	close(eventsCh)
}