func()

in api/internal/core/storage/etcd.go [181:235]


func (s *EtcdV3Storage) Watch(ctx context.Context, key string) <-chan WatchResponse {
	eventChan := s.client.Watch(ctx, key, clientv3.WithPrefix())
	ch := make(chan WatchResponse, 1)
	go func() {
		defer runtime.HandlePanic()
		for event := range eventChan {
			if event.Err() != nil {
				log.Errorf("etcd watch error: key: %s err: %v", key, event.Err())
				close(ch)
				return
			}

			output := WatchResponse{
				Canceled: event.Canceled,
			}

			for i := range event.Events {
				key := string(event.Events[i].Kv.Key)
				value := string(event.Events[i].Kv.Value)

				// Skip the data if its value is init_dir or {}
				// during watching phase.
				//
				// For more complex cases, an explicit function to determine if
				// skippable would be better.
				if value == SkippedValueEtcdInitDir || value == SkippedValueEtcdEmptyObject {
					continue
				}

				e := Event{
					Keypair: Keypair{
						Key:   key,
						Value: value,
					},
				}
				switch event.Events[i].Type {
				case clientv3.EventTypePut:
					e.Type = EventTypePut
				case clientv3.EventTypeDelete:
					e.Type = EventTypeDelete
				}
				output.Events = append(output.Events, e)
			}
			if output.Canceled {
				log.Error("channel canceled")
				output.Error = fmt.Errorf("channel canceled")
			}
			ch <- output
		}

		close(ch)
	}()

	return ch
}