in pkg/datasource/etcdv3/etcdv3.go [75:102]
func (s *Etcdv3DataSource) processWatchResponse(resp *clientv3.WatchResponse) {
if resp.CompactRevision > s.lastUpdatedRevision {
s.lastUpdatedRevision = resp.CompactRevision
}
if resp.Header.GetRevision() > s.lastUpdatedRevision {
s.lastUpdatedRevision = resp.Header.GetRevision()
}
if err := resp.Err(); err != nil {
logging.Error(err, "Watch on etcd endpoints occur error", "endpoints", s.client.Endpoints())
return
}
for _, ev := range resp.Events {
if ev.Type == mvccpb.PUT {
err := s.doReadAndUpdate()
if err != nil {
logging.Error(err, "Fail to execute doReadAndUpdate for PUT event")
}
}
if ev.Type == mvccpb.DELETE {
updateErr := s.Handle(nil)
if updateErr != nil {
logging.Error(updateErr, "Fail to execute doReadAndUpdate for DELETE event")
}
}
}
}