pixiu/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go (292 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package zookeeper import ( "encoding/json" "path" "strings" "sync" "time" ) import ( gxzookeeper "github.com/dubbogo/gost/database/kv/zk" ) import ( "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/springcloud/common" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/springcloud/servicediscovery" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/remote/zookeeper" ) const ( ZKRootPath = "/services" ZKName = "SpringCloud-Zookeeper" StatUP = "UP" MaxFailTimes = 3 DefaultTimeout = "3s" ConnDelay = 3 * time.Second defaultTTL = 3 * time.Second ) type zookeeperDiscovery struct { basePath string targetService []string listener servicediscovery.ServiceEventListener instanceMapLock sync.Mutex instanceMap map[string]*servicediscovery.ServiceInstance zkListener map[string]zookeeper.Listener clientFacade *BaseZkClientFacade } func NewZKServiceDiscovery(targetService []string, config *model.RemoteConfig, listener servicediscovery.ServiceEventListener) (servicediscovery.ServiceDiscovery, error) { var err error if len(config.Timeout) == 0 { config.Timeout = DefaultTimeout } client, err := zookeeper.NewZkClient(ZKName, config) if err != nil { return nil, err } rootPath := ZKRootPath if len(strings.TrimSpace(config.Root)) > 0 { rootPath = strings.TrimSpace(config.Root) } z := &zookeeperDiscovery{ basePath: rootPath, listener: listener, targetService: targetService, instanceMap: make(map[string]*servicediscovery.ServiceInstance), zkListener: map[string]zookeeper.Listener{}, clientFacade: &BaseZkClientFacade{ name: ZKName, client: client, conf: config, clientLock: sync.Mutex{}, wg: sync.WaitGroup{}, done: make(chan struct{}), }, } go zookeeper.HandleClientRestart(z.clientFacade) return z, err } func (sd *zookeeperDiscovery) QueryAllServices() ([]servicediscovery.ServiceInstance, error) { serviceNames, err := sd.queryForNames() logger.Debugf("%s get all services by root path %s, services %v", common.ZKLogDiscovery, sd.basePath, serviceNames) if err != nil { logger.Errorf("get all services error: %v", err.Error()) return nil, err } return sd.QueryServicesByName(serviceNames) } func (sd *zookeeperDiscovery) QueryServicesByName(serviceNames []string) ([]servicediscovery.ServiceInstance, error) { var instancesAll []servicediscovery.ServiceInstance for _, s := range serviceNames { pathForName := sd.pathForName(s) ids, err := sd.getClient().GetChildren(pathForName) logger.Debugf("%s get services %s, services instanceIds %s", common.ZKLogDiscovery, s, ids) if err != nil { // todo refactor gost zk, make it return the definite err if strings.Contains(err.Error(), "none children") { logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error()) } else { logger.Errorf("%s get services [%s] nodes from zookeeper fail: %s", common.ZKLogDiscovery, s, err.Error()) } continue } for _, id := range ids { var instance *servicediscovery.ServiceInstance instance, err = sd.queryForInstance(s, id) if err != nil { return nil, err } instancesAll = append(instancesAll, *instance) } } for _, instance := range instancesAll { if sd.instanceMap[instance.ID] == nil { sd.instanceMap[instance.ID] = &instance } } return instancesAll, nil } func (sd *zookeeperDiscovery) Register() error { logger.Debugf("%s Register implement me!!", common.ZKLogDiscovery) return nil } func (sd *zookeeperDiscovery) UnRegister() error { logger.Debugf("%s UnRegister implement me!!", common.ZKLogDiscovery) return nil } func (sd *zookeeperDiscovery) getClient() *gxzookeeper.ZookeeperClient { if err := zookeeper.ValidateZookeeperClient(sd.clientFacade, "zka3"); err != nil { logger.Errorf("%s ValidateZookeeperClient error %s", common.ZKLogDiscovery, err) } return sd.clientFacade.ZkClient() } func (sd *zookeeperDiscovery) Subscribe() error { logger.Debugf("%s Subscribe ...", common.ZKLogDiscovery) sd.zkListener[sd.basePath] = newZkAppListener(sd) sd.zkListener[sd.basePath].WatchAndHandle() logger.Debugf("%s Subscribe Success!", common.ZKLogDiscovery) return nil } func (sd *zookeeperDiscovery) Unsubscribe() error { logger.Debugf("%s Unsubscribe ...", common.ZKLogDiscovery) for k, listener := range sd.zkListener { logger.Infof("Unsubscribe listener %s", k) listener.Close() } logger.Debugf("%s Unsubscribe Success!", common.ZKLogDiscovery) return nil } func (sd *zookeeperDiscovery) queryForInstance(name string, id string) (*servicediscovery.ServiceInstance, error) { path := sd.pathForInstance(name, id) data, _, err := sd.getClient().GetContent(path) if err != nil { return nil, err } sczk := &SpringCloudZKInstance{} instance := &servicediscovery.ServiceInstance{} err = json.Unmarshal(data, sczk) if err != nil { return nil, err } instance.Port = sczk.Port instance.ServiceName = sczk.Name instance.Host = sczk.Address instance.ID = sczk.ID instance.CLusterName = sczk.Name instance.Healthy = sczk.Payload.Metadata.InstanceStatus == StatUP return instance, nil } func (sd *zookeeperDiscovery) getServiceMap() map[string][]*servicediscovery.ServiceInstance { m := make(map[string][]*servicediscovery.ServiceInstance) for _, instance := range sd.instanceMap { if instances := m[instance.ServiceName]; instances == nil { m[instance.ServiceName] = []*servicediscovery.ServiceInstance{} } m[instance.ServiceName] = append(m[instance.ServiceName], instance) } return m } func (sd *zookeeperDiscovery) delServiceInstance(instance *servicediscovery.ServiceInstance) (bool, error) { if instance == nil { return true, nil } defer sd.instanceMapLock.Unlock() sd.instanceMapLock.Lock() if sd.instanceMap[instance.ID] != nil { sd.listener.OnDeleteServiceInstance(instance) delete(sd.instanceMap, instance.ID) } return true, nil } func (sd *zookeeperDiscovery) updateServiceInstance(instance *servicediscovery.ServiceInstance) (bool, error) { if instance == nil { return true, nil } defer sd.instanceMapLock.Unlock() sd.instanceMapLock.Lock() if sd.instanceMap[instance.ID] != nil { sd.listener.OnUpdateServiceInstance(instance) sd.instanceMap[instance.ID] = instance } return true, nil } func (sd *zookeeperDiscovery) addServiceInstance(instance *servicediscovery.ServiceInstance) (bool, error) { if instance == nil { return true, nil } defer sd.instanceMapLock.Unlock() sd.instanceMapLock.Lock() if sd.instanceMap[instance.ID] == nil { sd.listener.OnAddServiceInstance(instance) sd.instanceMap[instance.ID] = instance } return true, nil } func (sd *zookeeperDiscovery) queryForNames() ([]string, error) { children, err := sd.getClient().GetChildren(sd.basePath) // todo refactor gost zk, make it return the definite err if err != nil && strings.Contains(err.Error(), "none children") { logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error()) return nil, nil } return children, err } func (sd *zookeeperDiscovery) pathForInstance(name, id string) string { return path.Join(sd.basePath, name, id) } func (sd *zookeeperDiscovery) pathForName(name string) string { return path.Join(sd.basePath, name) } type SpringCloudZKInstance struct { Name string `json:"name"` ID string `json:"id"` Address string `json:"address"` Port int `json:"port"` SslPort interface{} `json:"sslPort"` Payload struct { Class string `json:"@class"` ID string `json:"id"` Name string `json:"name"` Metadata struct { InstanceStatus string `json:"instance_status"` } `json:"metadata"` } `json:"payload"` RegistrationTimeUTC int64 `json:"registrationTimeUTC"` ServiceType string `json:"serviceType"` URISpec struct { Parts []struct { Value string `json:"value"` Variable bool `json:"variable"` } `json:"parts"` } `json:"uriSpec"` } type BaseZkClientFacade struct { name string client *gxzookeeper.ZookeeperClient clientLock sync.Mutex wg sync.WaitGroup done chan struct{} conf *model.RemoteConfig } func (b *BaseZkClientFacade) Name() string { return b.name } func (b *BaseZkClientFacade) ZkClient() *gxzookeeper.ZookeeperClient { return b.client } func (b *BaseZkClientFacade) SetZkClient(client *gxzookeeper.ZookeeperClient) { b.client = client } func (b *BaseZkClientFacade) ZkClientLock() *sync.Mutex { return &b.clientLock } func (b *BaseZkClientFacade) WaitGroup() *sync.WaitGroup { return &b.wg } func (b *BaseZkClientFacade) Done() chan struct{} { return b.done } func (b *BaseZkClientFacade) RestartCallBack() bool { return true } func (b *BaseZkClientFacade) GetConfig() *model.RemoteConfig { return b.conf } func Keys(m map[string][]*servicediscovery.ServiceInstance) []string { j := 0 keys := make([]string, len(m)) for k := range m { keys[j] = k j++ } return keys } func Diff(a, b []string) (diff []string) { m := make(map[string]bool) for _, item := range b { m[item] = true } for _, item := range a { if _, ok := m[item]; !ok { diff = append(diff, item) } } return diff }