func()

in pkg/datasource/etcdv3/etcdv3.go [75:102]


func (s *Etcdv3DataSource) processWatchResponse(resp *clientv3.WatchResponse) {
	if resp.CompactRevision > s.lastUpdatedRevision {
		s.lastUpdatedRevision = resp.CompactRevision
	}
	if resp.Header.GetRevision() > s.lastUpdatedRevision {
		s.lastUpdatedRevision = resp.Header.GetRevision()
	}

	if err := resp.Err(); err != nil {
		logging.Error(err, "Watch on etcd endpoints occur error", "endpoints", s.client.Endpoints())
		return
	}

	for _, ev := range resp.Events {
		if ev.Type == mvccpb.PUT {
			err := s.doReadAndUpdate()
			if err != nil {
				logging.Error(err, "Fail to execute doReadAndUpdate for PUT event")
			}
		}
		if ev.Type == mvccpb.DELETE {
			updateErr := s.Handle(nil)
			if updateErr != nil {
				logging.Error(updateErr, "Fail to execute doReadAndUpdate for DELETE event")
			}
		}
	}
}