pkg/datasource/etcdv3/etcdv3.go (114 lines of code) (raw):

package etcdv3 import ( "context" "time" "github.com/alibaba/sentinel-golang/ext/datasource" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "github.com/pkg/errors" ) type Etcdv3DataSource struct { datasource.Base propertyKey string lastUpdatedRevision int64 client *clientv3.Client // cancel is the func, call cancel will stop watching on the propertyKey cancel context.CancelFunc // closed indicate whether continuing to watch on the propertyKey closed util.AtomicBool } // NewDataSource new a Etcdv3DataSource instance. // client is the etcdv3 client, it must be useful and should be release by User. func NewDataSource(client *clientv3.Client, key string, handlers ...datasource.PropertyHandler) (*Etcdv3DataSource, error) { if client == nil { return nil, errors.New("The etcdv3 client is nil.") } ds := &Etcdv3DataSource{ client: client, propertyKey: key, } for _, h := range handlers { ds.AddPropertyHandler(h) } return ds, nil } func (s *Etcdv3DataSource) Initialize() error { err := s.doReadAndUpdate() if err != nil { logging.Error(err, "Fail to update data for key when execute Etcdv3DataSource.Initialize()", "propertyKey", s.propertyKey) } go util.RunWithRecover(s.watch) return nil } func (s *Etcdv3DataSource) ReadSource() ([]byte, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() resp, err := s.client.Get(ctx, s.propertyKey) if err != nil { return nil, errors.Errorf("Fail to get value for property key[%s]", s.propertyKey) } if resp.Count == 0 { return nil, errors.Errorf("The key[%s] is not existed in etcd server.", s.propertyKey) } s.lastUpdatedRevision = resp.Header.GetRevision() logging.Info("[Etcdv3] Get the newest data for key", "propertyKey", s.propertyKey, "revision", resp.Header.GetRevision(), "value", resp.Kvs[0].Value) return resp.Kvs[0].Value, nil } func (s *Etcdv3DataSource) doReadAndUpdate() error { src, err := s.ReadSource() if err != nil { return err } return s.Handle(src) } func (s *Etcdv3DataSource) processWatchResponse(resp *clientv3.WatchResponse) { if resp.CompactRevision > s.lastUpdatedRevision { s.lastUpdatedRevision = resp.CompactRevision } if resp.Header.GetRevision() > s.lastUpdatedRevision { s.lastUpdatedRevision = resp.Header.GetRevision() } if err := resp.Err(); err != nil { logging.Error(err, "Watch on etcd endpoints occur error", "endpoints", s.client.Endpoints()) return } for _, ev := range resp.Events { if ev.Type == mvccpb.PUT { err := s.doReadAndUpdate() if err != nil { logging.Error(err, "Fail to execute doReadAndUpdate for PUT event") } } if ev.Type == mvccpb.DELETE { updateErr := s.Handle(nil) if updateErr != nil { logging.Error(updateErr, "Fail to execute doReadAndUpdate for DELETE event") } } } } func (s *Etcdv3DataSource) watch() { // Add watch for propertyKey from lastUpdatedRevision updated after Initializing ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel rch := s.client.Watch(ctx, s.propertyKey, clientv3.WithCreatedNotify(), clientv3.WithRev(s.lastUpdatedRevision)) for { for resp := range rch { s.processWatchResponse(&resp) } // Stop watching if datasource had been closed. if s.closed.Get() { return } time.Sleep(time.Duration(1) * time.Second) ctx, cancel = context.WithCancel(context.Background()) s.cancel = cancel if s.lastUpdatedRevision > 0 { rch = s.client.Watch(ctx, s.propertyKey, clientv3.WithRev(s.lastUpdatedRevision+1)) } else { rch = s.client.Watch(ctx, s.propertyKey) } } } func (s *Etcdv3DataSource) Close() error { // stop to watch property key. s.closed.Set(true) s.cancel() return nil }