registry/nacos/registry.go (298 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"bytes"
"fmt"
"math"
"strconv"
"strings"
"time"
)
import (
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/dubbogo/gost/log/logger"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
perrors "github.com/pkg/errors"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)
const (
RegistryConnDelay = 3
)
func init() {
extension.SetRegistry(constant.NacosKey, newNacosRegistry)
}
type nacosRegistry struct {
*common.URL
namingClient *nacosClient.NacosNamingClient
registryUrls []*common.URL
}
func getCategory(url *common.URL) string {
role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, strconv.Itoa(constant.NacosDefaultRoleType)))
category := common.DubboNodes[role]
return category
}
func getServiceName(url *common.URL) string {
var buffer bytes.Buffer
buffer.Write([]byte(getCategory(url)))
appendParam(&buffer, url, constant.InterfaceKey)
appendParam(&buffer, url, constant.VersionKey)
appendParam(&buffer, url, constant.GroupKey)
return buffer.String()
}
func appendParam(target *bytes.Buffer, url *common.URL, key string) {
value := url.GetParam(key, "")
target.Write([]byte(constant.NacosServiceNameSeparator))
if strings.TrimSpace(value) != "" {
target.Write([]byte(value))
}
}
func createRegisterParam(url *common.URL, serviceName string, groupName string) vo.RegisterInstanceParam {
category := getCategory(url)
params := make(map[string]string)
url.RangeParams(func(key, value string) bool {
params[key] = value
return true
})
params[constant.NacosCategoryKey] = category
params[constant.NacosProtocolKey] = url.Protocol
params[constant.NacosPathKey] = url.Path
params[constant.MethodsKey] = strings.Join(url.Methods, ",")
common.HandleRegisterIPAndPort(url)
port, _ := strconv.Atoi(url.Port)
weightStr := url.GetParam(constant.WeightKey, "1.0")
weight, err := strconv.ParseFloat(weightStr, 64)
if err != nil || weight <= constant.MinNacosWeight {
logger.Warnf("Invalid weight value %q, using default 1.0. err: %v", weightStr, err)
weight = constant.DefaultNacosWeight
} else if weight > constant.MaxNacosWeight {
logger.Warnf("Weight %f exceeds Nacos maximum 10000, setting to 10000", weight)
weight = constant.MaxNacosWeight
}
instance := vo.RegisterInstanceParam{
Ip: url.Ip,
Port: uint64(port),
Metadata: params,
Weight: weight,
Enable: true,
Healthy: true,
Ephemeral: true,
ServiceName: serviceName,
GroupName: groupName,
}
return instance
}
// Register will register the service @url to its nacos registry center.
func (nr *nacosRegistry) Register(url *common.URL) error {
start := time.Now()
serviceName := getServiceName(url)
groupName := nr.URL.GetParam(constant.NacosGroupKey, defaultGroup)
param := createRegisterParam(url, serviceName, groupName)
logger.Infof("[Nacos Registry] Registry instance with param = %+v", param)
isRegistry, err := nr.namingClient.Client().RegisterInstance(param)
metrics.Publish(metricsRegistry.NewRegisterEvent(err == nil && isRegistry, start))
if err != nil {
return err
}
if !isRegistry {
return perrors.New("registry [" + serviceName + "] to nacos failed")
}
nr.registryUrls = append(nr.registryUrls, url)
return nil
}
func createDeregisterParam(url *common.URL, serviceName string, groupName string) vo.DeregisterInstanceParam {
common.HandleRegisterIPAndPort(url)
port, _ := strconv.Atoi(url.Port)
return vo.DeregisterInstanceParam{
Ip: url.Ip,
Port: uint64(port),
ServiceName: serviceName,
GroupName: groupName,
Ephemeral: true,
}
}
// UnRegister returns nil if unregister successfully. If not, returns an error.
func (nr *nacosRegistry) UnRegister(url *common.URL) error {
serviceName := getServiceName(url)
groupName := nr.URL.GetParam(constant.NacosGroupKey, defaultGroup)
param := createDeregisterParam(url, serviceName, groupName)
isDeRegistry, err := nr.namingClient.Client().DeregisterInstance(param)
if err != nil {
return err
}
if !isDeRegistry {
return perrors.New("DeRegistry [" + serviceName + "] to nacos failed")
}
return nil
}
// Subscribe returns nil if subscribing registry successfully. If not returns an error.
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
if role != common.CONSUMER {
return nil
}
serviceName := url.GetParam(constant.InterfaceKey, "")
var serviceNames []string
var err error
if serviceName == constant.AnyValue {
serviceNames, err = nr.getAllSubscribeServiceNames(url)
if err != nil {
return err
}
} else {
serviceNames = []string{getSubscribeName(url)}
}
return nr.subscribe(serviceNames, notifyListener)
}
// subscribe subscribe services
func (nr *nacosRegistry) subscribe(serviceNames []string, notifyListener registry.NotifyListener) error {
if len(serviceNames) == 0 {
logger.Warnf("No services to listen to.")
return nil
}
for {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
return perrors.New("nacosRegistry is not available.")
}
var err error
for _, serviceName := range serviceNames {
listener := NewNacosListenerWithServiceName(serviceName, nr.URL, nr.namingClient)
err = listener.listenService(serviceName)
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
if err != nil {
logger.Warnf("getAllServices() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
break
}
go nr.handleServiceEvents(listener, notifyListener)
}
if err == nil {
break
}
}
return nil
}
// getAllServices retrieves the list of all services from the registry
func (nr *nacosRegistry) getAllSubscribeServiceNames(url *common.URL) ([]string, error) {
services, err := nr.namingClient.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: nr.GetParam(constant.RegistryGroupKey, defaultGroup),
PageNo: 1,
PageSize: math.MaxInt32,
})
if err != nil {
logger.Errorf("query services error: %v", err)
return nil, err
}
var subScribeServiceNames []string
categories := strings.Split(url.GetParam(constant.CategoryKey, constant.DefaultCategory), constant.CommaSeparator)
for _, dom := range services.Doms {
if strings.Contains(dom, constant.NacosServiceNameSeparator) {
realCategory := strings.Split(dom, constant.NacosServiceNameSeparator)[0]
for _, item := range categories {
if item == realCategory {
subScribeServiceNames = append(subScribeServiceNames, dom)
}
}
}
}
return subScribeServiceNames, err
}
// handleServiceEvents receives service events from the listener and notifies the notifyListener
func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener, notifyListener registry.NotifyListener) {
for {
serviceEvent, err := listener.Next()
if err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return
}
logger.Infof("[Nacos Registry] Update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
}
// UnSubscribe :
func (nr *nacosRegistry) UnSubscribe(url *common.URL, _ registry.NotifyListener) error {
param := createSubscribeParam(getSubscribeName(url), nr.URL, nil)
if param == nil {
return nil
}
err := nr.namingClient.Client().Unsubscribe(param)
if err != nil {
return perrors.New("UnSubscribe [" + param.ServiceName + "] to nacos failed")
}
return nil
}
// LoadSubscribeInstances load subscribe instance
func (nr *nacosRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error {
serviceName := getSubscribeName(url)
groupName := nr.GetURL().GetParam(constant.RegistryGroupKey, defaultGroup)
instances, err := nr.namingClient.Client().SelectAllInstances(vo.SelectAllInstancesParam{
ServiceName: serviceName,
GroupName: groupName,
})
if err != nil {
return perrors.New(fmt.Sprintf("could not query the instances for serviceName=%s,groupName=%s,error=%v",
serviceName, groupName, err))
}
for i := range instances {
if newUrl := generateUrl(instances[i]); newUrl != nil {
notify.Notify(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: newUrl})
}
}
return nil
}
func createSubscribeParam(serviceName string, regUrl *common.URL, cb callback) *vo.SubscribeParam {
groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
if cb == nil {
v, ok := listenerCache.Load(serviceName + groupName)
if !ok {
return nil
}
listener, ok := v.(*nacosListener)
if !ok {
return nil
}
cb = listener.Callback
}
return &vo.SubscribeParam{
ServiceName: serviceName,
SubscribeCallback: cb,
GroupName: groupName,
}
}
// GetURL gets its registration URL
func (nr *nacosRegistry) GetURL() *common.URL {
return nr.URL
}
// IsAvailable determines nacos registry center whether it is available
func (nr *nacosRegistry) IsAvailable() bool {
// TODO
return true
}
// nolint
func (nr *nacosRegistry) Destroy() {
for _, url := range nr.registryUrls {
err := nr.UnRegister(url)
logger.Infof("DeRegister Nacos URL:%+v", url)
if err != nil {
logger.Errorf("Deregister URL:%+v err:%v", url, err.Error())
}
}
return
}
// newNacosRegistry will create new instance
func newNacosRegistry(url *common.URL) (registry.Registry, error) {
logger.Infof("[Nacos Registry] New nacos registry with url = %+v", url.ToMap())
// key transfer: registry -> nacos
url.SetParam(constant.NacosNamespaceID, url.GetParam(constant.RegistryNamespaceKey, ""))
url.SetParam(constant.NacosUsername, url.Username)
url.SetParam(constant.NacosPassword, url.Password)
url.SetParam(constant.NacosAccessKey, url.GetParam(constant.RegistryAccessKey, ""))
url.SetParam(constant.NacosSecretKey, url.GetParam(constant.RegistrySecretKey, ""))
url.SetParam(constant.NacosTimeout, url.GetParam(constant.RegistryTimeoutKey, constant.DefaultRegTimeout))
url.SetParam(constant.NacosGroupKey, url.GetParam(constant.RegistryGroupKey, defaultGroup))
namingClient, err := nacos.NewNacosClientByURL(url)
if err != nil {
return &nacosRegistry{}, err
}
tmpRegistry := &nacosRegistry{
URL: url, // registry.group is recorded at this url
namingClient: namingClient,
registryUrls: []*common.URL{},
}
return tmpRegistry, nil
}