pkg/datasource/consul/consul.go (115 lines of code) (raw):
package consul
import (
"context"
"errors"
"time"
"github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/hashicorp/consul/api"
)
type consulDataSource struct {
datasource.Base
propertyKey string
kvQuerier KVQuerier
isInitialized util.AtomicBool
cancel context.CancelFunc
queryOptions api.QueryOptions
}
var (
ErrNilConsulClient = errors.New("nil consul client")
ErrInvalidConsulConfig = errors.New("invalid consul config")
ErrKeyDoesNotExist = errors.New("key does not exist")
)
func NewDataSource(propertyKey string, opts ...Option) (datasource.DataSource, error) {
var options = evaluateOptions(opts)
// if user don't specify the consul client, sentinel should initialize from the configuration
if options.consulClient == nil {
if options.consulConfig == nil {
return nil, ErrInvalidConsulConfig
}
client, err := api.NewClient(options.consulConfig)
if err != nil {
return nil, err
}
options.consulClient = client
}
// consul is still nil.
if options.consulClient == nil {
return nil, ErrNilConsulClient
}
return newConsulDataSource(propertyKey, options), nil
}
func newConsulDataSource(propertyKey string, options *options) *consulDataSource {
ctx, cancel := context.WithCancel(options.queryOptions.Context())
ds := &consulDataSource{
propertyKey: propertyKey,
kvQuerier: options.consulClient.KV(),
cancel: cancel,
queryOptions: *options.queryOptions.WithContext(ctx),
}
for _, h := range options.propertyHandlers {
ds.AddPropertyHandler(h)
}
return ds
}
func (c *consulDataSource) ReadSource() ([]byte, error) {
pair, meta, err := c.kvQuerier.Get(c.propertyKey, &c.queryOptions)
if err != nil {
return nil, err
}
c.queryOptions.WaitIndex = meta.LastIndex
if pair == nil {
return nil, ErrKeyDoesNotExist
}
return pair.Value, nil
}
// Initialize implement datasource.DataSource interface
func (c *consulDataSource) Initialize() error {
if !c.isInitialized.CompareAndSet(false, true) {
return errors.New("consul datasource had been initialized")
}
if err := c.doReadAndUpdate(); err != nil {
// Failed to read default should't block initialization
logging.Error(err, "Failed to read initial data for key in consulDataSource.Initialize()", "propertyKey", c.propertyKey)
}
go util.RunWithRecover(c.watch)
return nil
}
func (c *consulDataSource) watch() {
logging.Info("[Consul] Consul data source is watching property", "propertyKey", c.propertyKey)
for {
if err := c.doReadAndUpdate(); err != nil {
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, ErrKeyDoesNotExist) {
continue
}
if api.IsRetryableError(err) {
logging.Warn("[Consul] Update failed with retryable error in consulDataSource.watch()", "err", err)
time.Sleep(time.Second)
continue
}
logging.FrequentErrorOnce.Do(func() {
logging.Error(err, "Failed to update data in consulDataSource.watch()", "propertyKey", c.propertyKey)
})
}
}
}
func (c *consulDataSource) doUpdate(src []byte) (err error) {
if len(src) == 0 {
return c.Handle(nil)
}
return c.Handle(src)
}
func (c *consulDataSource) doReadAndUpdate() (err error) {
src, err := c.ReadSource()
if err != nil {
return err
}
return c.doUpdate(src)
}
func (c *consulDataSource) Close() error {
if c.cancel != nil {
c.cancel()
}
logging.Info("[Consul] Consul data source has been closed", "propertyKey", c.propertyKey)
return nil
}