pkg/datasource/nacos/nacos.go (95 lines of code) (raw):

package nacos import ( "fmt" "github.com/alibaba/sentinel-golang/ext/datasource" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/nacos-group/nacos-sdk-go/clients/config_client" "github.com/nacos-group/nacos-sdk-go/vo" "github.com/pkg/errors" ) type NacosDataSource struct { datasource.Base client config_client.IConfigClient isInitialized util.AtomicBool group string dataId string } func NewNacosDataSource(client config_client.IConfigClient, group, dataId string, handlers ...datasource.PropertyHandler) (*NacosDataSource, error) { if client == nil { return nil, errors.New("nil nacos config client") } if len(group) == 0 || len(dataId) == 0 { return nil, errors.New(fmt.Sprintf("invalid parameters, group: %s, dataId: %s", group, dataId)) } var ds = &NacosDataSource{ Base: datasource.Base{}, client: client, group: group, dataId: dataId, } for _, h := range handlers { ds.AddPropertyHandler(h) } return ds, nil } func (s *NacosDataSource) Initialize() error { if !s.isInitialized.CompareAndSet(false, true) { return nil } data, err := s.ReadSource() if err != nil { return err } if err = s.doUpdate(data); err != nil { return err } err = s.listen(s.client) if err == nil { logging.Info("[Nacos] Nacos data source is successfully initialized", "group", s.group, "dataId", s.dataId) } return err } func (s *NacosDataSource) ReadSource() ([]byte, error) { content, err := s.client.GetConfig(vo.ConfigParam{ DataId: s.dataId, Group: s.group, }) if err != nil { return nil, errors.Errorf("Failed to read the nacos data source when initialization, err: %+v", err) } logging.Info("[Nacos] Succeed to read source", "group", s.group, "dataId", s.dataId, "content", content) return []byte(content), err } func (s *NacosDataSource) doUpdate(data []byte) error { return s.Handle(data) } func (s *NacosDataSource) listen(client config_client.IConfigClient) (err error) { listener := vo.ConfigParam{ DataId: s.dataId, Group: s.group, OnChange: func(namespace, group, dataId, data string) { logging.Info("[Nacos] Receive listened property", "namespace", namespace, "group", group, "dataId", dataId, "data", data) err := s.doUpdate([]byte(data)) if err != nil { logging.Error(err, "Fail to update data source in NacosDataSource.listen()") } }, } err = client.ListenConfig(listener) if err != nil { return errors.Errorf("Failed to listen to the nacos data source, err: %+v", err) } return nil } func (s *NacosDataSource) Close() error { err := s.client.CancelListenConfig(vo.ConfigParam{ DataId: s.dataId, Group: s.group, }) if err != nil { return errors.Errorf("Failed to cancel listen to the nacos data source, err: %+v", err) } logging.Info("[Nacos] The nacos datasource had been closed", "group", s.group, "dataId", s.dataId) return nil }