registry/zookeeper/watcher.go (681 lines of code) (raw):
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zookeeper
import (
"encoding/json"
"errors"
"net/url"
"path"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"
"istio.io/api/networking/v1alpha3"
"istio.io/pkg/log"
apiv1 "github.com/alibaba/higress/api/networking/v1"
"github.com/alibaba/higress/pkg/common"
provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
)
type watchConfig struct {
exit chan struct{}
listen bool
}
type watcher struct {
provider.BaseWatcher
apiv1.RegistryConfig
WatchingServices map[string]watchConfig `json:"watching_services"`
RegistryType provider.ServiceRegistryType `json:"registry_type"`
Status provider.WatcherStatus `json:"status"`
serviceRemaind *atomic.Int32
cache memory.Cache
mutex *sync.Mutex
stop chan struct{}
zkClient *gxzookeeper.ZookeeperClient
reconnectCh <-chan struct{}
Done chan struct{}
seMux *sync.Mutex
serviceEntry map[string]InterfaceConfig
listIndex chan ListServiceConfig
listServiceChan chan struct{}
isStop bool
keepStaleWhenEmpty bool
zkServicesPath []string
}
type WatcherOption func(w *watcher)
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
w := &watcher{
WatchingServices: make(map[string]watchConfig),
RegistryType: provider.Zookeeper,
Status: provider.UnHealthy,
cache: cache,
mutex: &sync.Mutex{},
stop: make(chan struct{}),
Done: make(chan struct{}),
seMux: &sync.Mutex{},
serviceEntry: make(map[string]InterfaceConfig),
listIndex: make(chan ListServiceConfig, 1),
listServiceChan: make(chan struct{}),
zkServicesPath: []string{SPRING_CLOUD_SERVICES},
}
timeout, _ := time.ParseDuration(DEFAULT_REG_TIMEOUT)
for _, opt := range opts {
opt(w)
}
var address []string
address = append(address, w.Domain+":"+strconv.Itoa(int(w.Port)))
newClient, cltErr := gxzookeeper.NewZookeeperClient("zk", address, false, gxzookeeper.WithZkTimeOut(timeout))
if cltErr != nil {
log.Errorf("[NewWatcher] NewWatcher zk, err:%v, zk address:%s", cltErr, address)
return nil, cltErr
}
valid := newClient.ZkConnValid()
if !valid {
log.Info("connect zk error")
return nil, errors.New("connect zk error")
}
w.reconnectCh = newClient.Reconnect()
w.zkClient = newClient
go func() {
w.HandleClientRestart()
}()
return w, nil
}
func WithKeepStaleWhenEmpty(enable bool) WatcherOption {
return func(w *watcher) {
w.keepStaleWhenEmpty = enable
}
}
func WithZkServicesPath(paths []string) WatcherOption {
return func(w *watcher) {
for _, path := range paths {
path = strings.TrimSuffix(path, common.Slash)
if path == DUBBO_SERVICES || path == SPRING_CLOUD_SERVICES {
continue
}
w.zkServicesPath = append(w.zkServicesPath, path)
}
}
}
func (w *watcher) HandleClientRestart() {
for {
select {
case <-w.reconnectCh:
w.reconnectCh = w.zkClient.Reconnect()
log.Info("zkclient reconnected")
w.RestartCallBack()
time.Sleep(10 * time.Microsecond)
case <-w.Done:
log.Info("[HandleClientRestart] receive registry destroy event, quit client restart handler")
return
}
}
}
func (w *watcher) RestartCallBack() bool {
err := w.fetchAllServices()
if err != nil {
log.Errorf("[RestartCallBack] fetch all service for zk err:%v", err)
return false
}
return true
}
type serviceInfo struct {
serviceType ServiceType
rootPath string
service string
}
func (w *watcher) fetchedServices(fetchedServices map[string]serviceInfo, path string, serviceType ServiceType) error {
children, err := w.zkClient.GetChildren(path)
if err != nil {
if err == gxzookeeper.ErrNilChildren || err == gxzookeeper.ErrNilNode ||
strings.Contains(err.Error(), "has none children") {
return nil
} else {
log.Errorf("[fetchAllServices] can not get children, err:%v, path:%s", err, path)
return err
}
}
info := serviceInfo{
serviceType: serviceType,
rootPath: path,
}
for _, child := range children {
if child == CONFIG || child == MAPPING || child == METADATA {
continue
}
var interfaceName string
switch serviceType {
case DubboService:
interfaceName = child
case SpringCloudService:
info.service = child
if path == "" || path == common.Slash {
interfaceName = child
break
}
interfaceName = child + "." + strings.ReplaceAll(
strings.TrimPrefix(path, common.Slash), common.Slash, common.Hyphen)
}
fetchedServices[interfaceName] = info
log.Debugf("fetchedServices, interface:%s, path:%s", interfaceName, info.rootPath)
}
return nil
}
func (w *watcher) fetchAllServices(firstFetch ...bool) error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isStop {
return nil
}
fetchedServices := make(map[string]serviceInfo)
var result error
err := w.fetchedServices(fetchedServices, DUBBO_SERVICES, DubboService)
if err != nil {
result = multierror.Append(result, err)
}
for _, path := range w.zkServicesPath {
err = w.fetchedServices(fetchedServices, path, SpringCloudService)
if err != nil {
result = multierror.Append(result, err)
}
}
for interfaceName, value := range w.WatchingServices {
if _, exist := fetchedServices[interfaceName]; !exist {
if value.exit != nil {
close(value.exit)
}
delete(w.WatchingServices, interfaceName)
}
}
var serviceConfigs []ListServiceConfig
for interfaceName, serviceInfo := range fetchedServices {
if _, exist := w.WatchingServices[interfaceName]; !exist {
w.WatchingServices[interfaceName] = watchConfig{
exit: make(chan struct{}),
listen: true,
}
serviceConfig := ListServiceConfig{
ServiceType: serviceInfo.serviceType,
InterfaceName: interfaceName,
Exit: w.WatchingServices[interfaceName].exit,
}
switch serviceInfo.serviceType {
case DubboService:
serviceConfig.UrlIndex = DUBBO + interfaceName + PROVIDERS
case SpringCloudService:
serviceConfig.UrlIndex = path.Join(serviceInfo.rootPath, serviceInfo.service)
default:
return errors.New("unknown type")
}
serviceConfigs = append(serviceConfigs, serviceConfig)
}
}
if len(firstFetch) > 0 && firstFetch[0] {
w.serviceRemaind = atomic.NewInt32(int32(len(serviceConfigs)))
}
for _, service := range serviceConfigs {
w.listIndex <- service
}
return result
}
func (w *watcher) ListenService() {
defer func() {
w.listServiceChan <- struct{}{}
}()
ttl := DefaultTTL
var failTimes int
for {
select {
case listIndex := <-w.listIndex:
go func() {
for {
log.Info(listIndex.UrlIndex)
children, childEventCh, err := w.zkClient.GetChildrenW(listIndex.UrlIndex)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
log.Errorf("[Zookeeper][ListenService] Get children of path zkRootPath with watcher failed, err:%v, index:%s", err, listIndex.UrlIndex)
// May be the provider does not ready yet, sleep failTimes * ConnDelay seconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
continue
case <-listIndex.Exit:
return
}
}
failTimes = 0
if len(children) > 0 {
w.ChildToServiceEntry(children, listIndex.InterfaceName, listIndex.UrlIndex, listIndex.ServiceType)
}
if w.serviceRemaind != nil {
w.serviceRemaind.Sub(1)
}
if w.startScheduleWatchTask(listIndex, children, ttl, childEventCh, listIndex.Exit) {
return
}
}
}()
case <-w.stop:
log.Info("[ListenService] is shutdown")
return
}
}
}
func (w *watcher) DataChange(eventType Event) bool {
//fmt.Println(eventType)
host, interfaceConfig, err := w.GetInterfaceConfig(eventType)
if err != nil {
log.Errorf("GetInterfaceConfig failed, err:%v, event:%v", err, eventType)
return false
}
if eventType.Action == EventTypeAdd || eventType.Action == EventTypeUpdate {
w.seMux.Lock()
isHave := false
value, ok := w.serviceEntry[host]
if ok {
for _, endpoint := range value.Endpoints {
if endpoint.Ip == interfaceConfig.Endpoints[0].Ip && endpoint.Port == interfaceConfig.Endpoints[0].Port {
isHave = true
}
}
if !isHave {
value.Endpoints = append(value.Endpoints, interfaceConfig.Endpoints[0])
}
w.serviceEntry[host] = value
} else {
w.serviceEntry[host] = *interfaceConfig
}
se := w.generateServiceEntry(w.serviceEntry[host])
w.seMux.Unlock()
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
RegistryName: w.Name,
})
w.UpdateService()
} else if eventType.Action == EventTypeDel {
w.seMux.Lock()
value, ok := w.serviceEntry[host]
if ok {
var endpoints []Endpoint
for _, endpoint := range value.Endpoints {
if endpoint.Ip == interfaceConfig.Endpoints[0].Ip && endpoint.Port == interfaceConfig.Endpoints[0].Port {
continue
} else {
endpoints = append(endpoints, endpoint)
}
}
value.Endpoints = endpoints
w.serviceEntry[host] = value
}
se := w.generateServiceEntry(w.serviceEntry[host])
w.seMux.Unlock()
//todo update
if len(se.Endpoints) == 0 {
if !w.keepStaleWhenEmpty {
w.cache.DeleteServiceWrapper(host)
}
} else {
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
RegistryName: w.Name,
})
}
w.UpdateService()
}
return true
}
func (w *watcher) GetInterfaceConfig(event Event) (string, *InterfaceConfig, error) {
switch event.ServiceType {
case DubboService:
return w.GetDubboConfig(event.Path)
case SpringCloudService:
return w.GetSpringCloudConfig(event.InterfaceName, event.Content)
default:
return "", nil, errors.New("unknown service type")
}
}
func (w *watcher) GetSpringCloudConfig(interfaceName string, content []byte) (string, *InterfaceConfig, error) {
var instance SpringCloudInstance
err := json.Unmarshal(content, &instance)
if err != nil {
log.Errorf("unmarshal failed, err:%v, content:%s", err, content)
return "", nil, err
}
var config InterfaceConfig
host := interfaceName
config.Host = host
config.Protocol = common.HTTP.String()
if len(instance.Payload.Metadata) > 0 && instance.Payload.Metadata["protocol"] != "" {
config.Protocol = common.ParseProtocol(instance.Payload.Metadata["protocol"]).String()
}
port := strconv.Itoa(instance.Port)
if port == "" {
return "", nil, errors.New("empty port")
}
endpoint := Endpoint{
Ip: instance.Address,
Port: port,
Metadata: instance.Payload.Metadata,
}
config.Endpoints = []Endpoint{endpoint}
config.ServiceType = SpringCloudService
return host, &config, nil
}
func (w *watcher) GetDubboConfig(dubboUrl string) (string, *InterfaceConfig, error) {
dubboUrl = strings.Replace(dubboUrl, "%3F", "?", 1)
dubboUrl = strings.ReplaceAll(dubboUrl, "%3D", "=")
dubboUrl = strings.ReplaceAll(dubboUrl, "%26", "&")
tempPath := strings.Replace(dubboUrl, DUBBO, "", -1)
urls := strings.Split(tempPath, PROVIDERS+"/dubbo")
key := urls[0]
serviceUrl, urlParseErr := url.Parse(dubboUrl)
if urlParseErr != nil {
return "", nil, urlParseErr
}
var (
dubboInterfaceConfig InterfaceConfig
host string
)
serviceUrl.Path = strings.Replace(serviceUrl.Path, DUBBO+key+PROVIDERS+"/dubbo://", "", -1)
values, err := url.ParseQuery(serviceUrl.RawQuery)
if err != nil {
return "", nil, err
}
paths := strings.Split(serviceUrl.Path, "/")
if len(paths) > 0 {
var group string
_, ok := values["group"]
if ok {
group = values["group"][0]
}
version := "0.0.0"
_, ok = values[VERSION]
if ok && len(values[VERSION]) > 0 {
version = values[VERSION][0]
}
dubboInterfaceConfig.Host = "providers:" + key + ":" + version + ":" + group
host = dubboInterfaceConfig.Host
dubboInterfaceConfig.Protocol = DUBBO_PROTOCOL
address := strings.Split(paths[0], ":")
if len(address) != 2 {
log.Infof("[GetDubboConfig] can not get dubbo ip and port, path:%s ", serviceUrl.Path)
return "", nil, errors.New("can not get dubbo ip and port")
}
metadata := make(map[string]string)
for key, value := range values {
if len(value) == 1 {
metadata[key] = value[0]
}
}
metadata[PROTOCOL] = DUBBO_PROTOCOL
dubboEndpoint := Endpoint{
Ip: address[0],
Port: address[1],
Metadata: metadata,
}
dubboInterfaceConfig.Endpoints = append(dubboInterfaceConfig.Endpoints, dubboEndpoint)
}
dubboInterfaceConfig.ServiceType = DubboService
return host, &dubboInterfaceConfig, nil
}
func (w *watcher) startScheduleWatchTask(serviceConfig ListServiceConfig, oldChildren []string, ttl time.Duration, childEventCh <-chan zk.Event, exit chan struct{}) bool {
zkRootPath := serviceConfig.UrlIndex
interfaceName := serviceConfig.InterfaceName
serviceType := serviceConfig.ServiceType
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
}
ticker := time.NewTicker(tickerTTL)
for {
select {
case <-ticker.C:
w.handleZkNodeEvent(zkRootPath, oldChildren, interfaceName, serviceType)
if tickerTTL < ttl {
tickerTTL *= 2
if tickerTTL > ttl {
tickerTTL = ttl
}
ticker.Stop()
ticker = time.NewTicker(tickerTTL)
}
case zkEvent := <-childEventCh:
if zkEvent.Type == zk.EventNodeChildrenChanged {
w.handleZkNodeEvent(zkEvent.Path, oldChildren, interfaceName, serviceType)
}
return false
case <-exit:
ticker.Stop()
return true
}
}
}
func (w *watcher) handleZkNodeEvent(zkPath string, oldChildren []string, interfaceName string, serviceType ServiceType) {
newChildren, err := w.zkClient.GetChildren(zkPath)
if err != nil {
if err == gxzookeeper.ErrNilChildren || err == gxzookeeper.ErrNilNode ||
strings.Contains(err.Error(), "has none children") {
content, _, connErr := w.zkClient.Conn.Get(zkPath)
if connErr != nil {
log.Errorf("[handleZkNodeEvent] Get new node path's content error:%v, path:%s", connErr, zkPath)
} else {
for _, c := range oldChildren {
path := path.Join(zkPath, c)
content, _, connErr = w.zkClient.Conn.Get(path)
if connErr != nil {
log.Errorf("[handleZkNodeEvent] Get node path's content error:%v, path:%s", connErr, path)
continue
}
w.DataChange(Event{
Path: path,
Action: EventTypeDel,
Content: content,
InterfaceName: interfaceName,
ServiceType: serviceType,
})
}
}
} else {
log.Errorf("zkClient get children failed, err:%v", err)
}
return
}
w.ChildToServiceEntry(newChildren, interfaceName, zkPath, serviceType)
}
func (w *watcher) ChildToServiceEntry(children []string, interfaceName, zkPath string, serviceType ServiceType) {
serviceEntry := make(map[string]InterfaceConfig)
switch serviceType {
case DubboService:
w.DubboChildToServiceEntry(serviceEntry, children, interfaceName, zkPath)
case SpringCloudService:
w.SpringCloudChildToServiceEntry(serviceEntry, children, interfaceName, zkPath)
default:
log.Error("unknown type")
}
if len(serviceEntry) != 0 {
w.seMux.Lock()
for host, config := range serviceEntry {
se := w.generateServiceEntry(config)
value, ok := w.serviceEntry[host]
if ok {
if !reflect.DeepEqual(value, config) {
w.serviceEntry[host] = config
//todo update or create serviceentry
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
RegistryName: w.Name,
})
}
} else {
w.serviceEntry[host] = config
w.cache.UpdateServiceWrapper(host, &memory.ServiceWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
RegistryName: w.Name,
})
}
}
w.seMux.Unlock()
w.UpdateService()
}
}
func (w *watcher) SpringCloudChildToServiceEntry(serviceEntry map[string]InterfaceConfig, children []string, interfaceName, zkPath string) {
for _, c := range children {
path := path.Join(zkPath, c)
content, _, err := w.zkClient.Conn.Get(path)
if err != nil {
log.Errorf("[handleZkNodeEvent] Get node path's content error:%v, path:%s", err, path)
continue
}
host, config, err := w.GetSpringCloudConfig(interfaceName, content)
if err != nil {
log.Errorf("GetSpringCloudConfig failed:%v", err)
continue
}
if existConfig, exist := serviceEntry[host]; !exist {
serviceEntry[host] = *config
} else {
existConfig.Endpoints = append(existConfig.Endpoints, config.Endpoints...)
serviceEntry[host] = existConfig
}
}
}
func (w *watcher) DubboChildToServiceEntry(serviceEntry map[string]InterfaceConfig, children []string, interfaceName, zkPath string) {
for _, c := range children {
path := path.Join(zkPath, c)
host, config, err := w.GetDubboConfig(path)
if err != nil {
log.Errorf("GetDubboConfig failed:%v", err)
continue
}
if existConfig, exist := serviceEntry[host]; !exist {
serviceEntry[host] = *config
} else {
existConfig.Endpoints = append(existConfig.Endpoints, config.Endpoints...)
serviceEntry[host] = existConfig
}
}
}
func (w *watcher) generateServiceEntry(config InterfaceConfig) *v1alpha3.ServiceEntry {
portList := make([]*v1alpha3.ServicePort, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
for _, service := range config.Endpoints {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata[PROTOCOL] != "" {
protocol = common.ParseProtocol(service.Metadata[PROTOCOL])
}
portNumber, _ := strconv.Atoi(service.Port)
port := &v1alpha3.ServicePort{
Name: protocol.String(),
Number: uint32(portNumber),
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
}
endpoints = append(endpoints, &v1alpha3.WorkloadEntry{
Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Metadata,
Weight: 1,
})
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{config.Host + ".zookeeper"},
Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_STATIC,
Endpoints: endpoints,
}
return se
}
func (w *watcher) Run() {
defer func() {
log.Info("[zookeeper] Run is down")
if r := recover(); r != nil {
log.Info("Recovered in f", "r is", r)
}
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
go func() {
w.ListenService()
}()
firstFetchErr := w.fetchAllServices(true)
if firstFetchErr != nil {
log.Errorf("first fetch services failed:%v", firstFetchErr)
}
for {
select {
case <-ticker.C:
var needNewFetch bool
if w.watcherReady() {
w.Ready(true)
needNewFetch = true
}
if firstFetchErr != nil || needNewFetch {
firstFetchErr = w.fetchAllServices()
}
case <-w.stop:
return
case <-w.listServiceChan:
go func() {
w.ListenService()
}()
}
}
}
func (w *watcher) Stop() {
w.mutex.Lock()
for key, value := range w.WatchingServices {
if value.exit != nil {
close(value.exit)
}
delete(w.WatchingServices, key)
}
w.isStop = true
w.mutex.Unlock()
w.seMux.Lock()
for key := range w.serviceEntry {
w.cache.DeleteServiceWrapper(key)
}
w.UpdateService()
w.seMux.Unlock()
close(w.stop)
close(w.Done)
w.zkClient.Close()
w.Ready(false)
}
func (w *watcher) IsHealthy() bool {
return w.Status == provider.Healthy
}
func (w *watcher) GetRegistryType() string {
return w.RegistryType.String()
}
func (w *watcher) watcherReady() bool {
if w.serviceRemaind == nil {
return true
}
remaind := w.serviceRemaind.Load()
if remaind <= 0 {
return true
}
return false
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}