func()

in datasource/etcd/state/etcd/cacher_kv.go [159:199]


func (c *KvCacher) handleEventBus(eventBus *sdcommon.EventBus) error {
	defer eventBus.Stop()
	for resp := range eventBus.ResourceEventBus() {
		if resp == nil {
			return errors.New("handle watcher error")
		}

		start := time.Now()
		rev := resp.Revision
		evts := make([]kvstore.Event, 0, len(resp.Resources))
		for _, resource := range resp.Resources {
			evt := kvstore.NewEvent(rmodel.EVT_CREATE, nil, resource.ModRevision)
			switch {
			case resp.Action == sdcommon.ActionPUT && resource.Version == 1:
				evt.Type, evt.KV = rmodel.EVT_CREATE, c.doParse(resource)
			case resp.Action == sdcommon.ActionPUT:
				evt.Type, evt.KV = rmodel.EVT_UPDATE, c.doParse(resource)
			case resp.Action == sdcommon.ActionDelete:
				evt.Type = rmodel.EVT_DELETE
				if resource.Value == nil {
					// it will happen in embed mode, and then need to get the cache value not unmarshal
					evt.KV = c.cache.Get(resource.Key)
				} else {
					evt.KV = c.doParse(resource)
				}
			default:
				log.Error(fmt.Sprintf("unknown KeyValue %v", resource), nil)
				continue
			}
			if evt.KV == nil {
				log.Error(fmt.Sprintf("failed to parse KeyValue %v", resource), nil)
				continue
			}
			evts = append(evts, evt)
		}
		c.sync(evts)
		log.DebugOrWarn(start, fmt.Sprintf("finish to handle %d events, prefix: %s, rev: %d",
			len(evts), c.Cfg.Key, rev))
	}
	return nil
}