plugin/registry/nacos/nacos.go (199 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nacos
import (
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry"
"github.com/gogf/gf/util/gconv"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"go.uber.org/atomic"
"net"
"strconv"
"strings"
"sync"
"time"
)
var (
defaultConnectTimeout = 5000 * time.Millisecond
DefaultClusterName = "DEFAULT"
DefaultGroupName = "DEFAULT_GROUP"
DefaultWeight float64 = 10
protoList = []string{"TCP", "HTTP", "GRPC"}
)
func init() {
plugin.Register("nacos", &Registry{})
}
type Registry struct {
initStatus *atomic.Bool
startStatus *atomic.Bool
cfg *Config
client naming_client.INamingClient
registryInfos *sync.Map // map[string]*registry.EventMeshRegisterInfo
}
func (r *Registry) Type() string {
return registry.Type
}
func (r *Registry) Setup(name string, dec plugin.Decoder) error {
conf := &Config{}
if err := dec.Decode(conf); err != nil {
return err
}
r.cfg = conf
return nil
}
func (r *Registry) Init() error {
r.initStatus = atomic.NewBool(true)
r.startStatus = atomic.NewBool(false)
r.registryInfos = new(sync.Map)
return nil
}
func (r *Registry) Start() error {
var p vo.NacosClientParam
var addresses []string
if len(r.cfg.AddressList) > 0 {
addresses = strings.Split(r.cfg.AddressList, ",")
}
for _, address := range addresses {
ip, port, err := net.SplitHostPort(address)
if err != nil {
return err
}
p.ServerConfigs = append(p.ServerConfigs, constant.ServerConfig{IpAddr: ip, Port: gconv.Uint64(port)})
}
p.ClientConfig = &constant.ClientConfig{
TimeoutMs: uint64(defaultConnectTimeout.Milliseconds()),
CacheDir: r.cfg.CacheDir,
}
cli, err := clients.NewNamingClient(p)
if err != nil {
return err
}
r.startStatus.Store(true)
r.client = cli
return nil
}
func (r *Registry) Shutdown() error {
r.startStatus.Store(false)
r.initStatus.Store(false)
r.client.CloseClient()
return nil
}
func (r *Registry) FindEventMeshInfoByCluster(clusterName string) ([]*registry.EventMeshDataInfo, error) {
var infos []*registry.EventMeshDataInfo
meshServerName := config.GlobalConfig().Common.Name
cluster := config.GlobalConfig().Common.Cluster
for _, proto := range protoList {
ins, err := r.client.SelectInstances(vo.SelectInstancesParam{
Clusters: []string{clusterName},
ServiceName: fmt.Sprintf("%v-%v", meshServerName, proto),
GroupName: cluster,
HealthyOnly: true,
})
if err != nil {
return nil, err
}
if len(ins) == 0 {
continue
}
for _, in := range ins {
infos = append(infos, ®istry.EventMeshDataInfo{
EventMeshClusterName: in.ClusterName,
EventMeshName: in.ServiceName,
Endpoint: fmt.Sprintf("%v:%v", in.Ip, in.Port),
LastUpdateTimestamp: time.Time{},
Metadata: in.Metadata,
})
}
}
return infos, nil
}
func (r *Registry) FindAllEventMeshInfo() ([]*registry.EventMeshDataInfo, error) {
var infos []*registry.EventMeshDataInfo
meshServerName := config.GlobalConfig().Common.Name
for _, proto := range protoList {
ins, err := r.client.SelectInstances(vo.SelectInstancesParam{
Clusters: []string{},
ServiceName: fmt.Sprintf("%v-%v", meshServerName, proto),
GroupName: "GROUP",
HealthyOnly: true,
})
if err != nil {
return nil, err
}
if len(ins) == 0 {
continue
}
for _, in := range ins {
infos = append(infos, ®istry.EventMeshDataInfo{
EventMeshClusterName: in.ClusterName,
EventMeshName: in.ServiceName,
Endpoint: fmt.Sprintf("%v:%v", in.Ip, in.Port),
LastUpdateTimestamp: time.Time{},
Metadata: in.Metadata,
})
}
}
return infos, nil
}
// FindEventMeshClientDistributionData not used
// deprecate
func (r *Registry) FindEventMeshClientDistributionData(clusterName, group, purpose string) (map[string]map[string]int, error) {
return nil, nil
}
func (r *Registry) RegisterMetadata(map[string]string) {
r.registryInfos.Range(func(key, value any) bool {
r.Register(value.(*registry.EventMeshRegisterInfo))
return true
})
}
func (r *Registry) Register(info *registry.EventMeshRegisterInfo) error {
ipPort := strings.Split(info.EndPoint, ":")
if len(ipPort) != 2 {
return fmt.Errorf("endpoint format err")
}
ip := ipPort[0]
port, err := strconv.ParseInt(ipPort[1], 10, 64)
if err != nil {
return err
}
_, err = r.client.RegisterInstance(vo.RegisterInstanceParam{
Ip: ip,
Port: uint64(port),
ServiceName: info.EventMeshName,
GroupName: uniqGroupName(info.ProtocolType),
Healthy: true,
Enable: true,
Weight: DefaultWeight,
})
if err != nil {
return err
}
r.registryInfos.Store(info.EventMeshName, info)
return nil
}
func (r *Registry) UnRegister(info *registry.EventMeshUnRegisterInfo) error {
ipPort := strings.Split(info.EndPoint, ":")
if len(ipPort) != 2 {
return fmt.Errorf("endpoint format err")
}
ip := ipPort[0]
port, err := strconv.ParseInt(ipPort[1], 10, 64)
if err != nil {
return err
}
_, err = r.client.DeregisterInstance(vo.DeregisterInstanceParam{
Ip: ip,
Port: uint64(port),
ServiceName: info.EventMeshName,
GroupName: uniqGroupName(info.ProtocolType),
})
if err != nil {
return err
}
r.registryInfos.Delete(info.EventMeshName)
return nil
}
func uniqGroupName(protoType string) string {
return fmt.Sprintf("%s-GROUP", protoType)
}