in api/internal/core/storage/etcd.go [181:235]
func (s *EtcdV3Storage) Watch(ctx context.Context, key string) <-chan WatchResponse {
eventChan := s.client.Watch(ctx, key, clientv3.WithPrefix())
ch := make(chan WatchResponse, 1)
go func() {
defer runtime.HandlePanic()
for event := range eventChan {
if event.Err() != nil {
log.Errorf("etcd watch error: key: %s err: %v", key, event.Err())
close(ch)
return
}
output := WatchResponse{
Canceled: event.Canceled,
}
for i := range event.Events {
key := string(event.Events[i].Kv.Key)
value := string(event.Events[i].Kv.Value)
// Skip the data if its value is init_dir or {}
// during watching phase.
//
// For more complex cases, an explicit function to determine if
// skippable would be better.
if value == SkippedValueEtcdInitDir || value == SkippedValueEtcdEmptyObject {
continue
}
e := Event{
Keypair: Keypair{
Key: key,
Value: value,
},
}
switch event.Events[i].Type {
case clientv3.EventTypePut:
e.Type = EventTypePut
case clientv3.EventTypeDelete:
e.Type = EventTypeDelete
}
output.Events = append(output.Events, e)
}
if output.Canceled {
log.Error("channel canceled")
output.Error = fmt.Errorf("channel canceled")
}
ch <- output
}
close(ch)
}()
return ch
}