pkg/admin/services/registry_service_sync.go (132 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package services import ( "net/url" "strings" "sync" "github.com/apache/dubbo-admin/pkg/logger" "github.com/apache/dubbo-admin/pkg/admin/cache" "github.com/apache/dubbo-admin/pkg/admin/constant" util2 "github.com/apache/dubbo-admin/pkg/admin/util" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/registry" ) var ( SUBSCRIBE *common.URL UrlIdsMapper sync.Map ) func init() { queryParams := url.Values{ constant.InterfaceKey: {constant.AnyValue}, constant.GroupKey: {constant.AnyValue}, constant.VersionKey: {constant.AnyValue}, constant.ClassifierKey: {constant.AnyValue}, constant.CategoryKey: {constant.ProvidersCategory + "," + constant.ConsumersCategory + "," + constant.RoutersCategory + "," + constant.ConfiguratorsCategory}, constant.EnabledKey: {constant.AnyValue}, constant.CheckKey: {"false"}, } SUBSCRIBE, _ = common.NewURL(common.GetLocalIp()+":0", common.WithProtocol(constant.AdminProtocol), common.WithParams(queryParams), ) } func StartSubscribe(registry registry.Registry) { registry.Subscribe(SUBSCRIBE, adminNotifyListener{}) } func DestroySubscribe(registry registry.Registry) { registry.Destroy() } type adminNotifyListener struct{} func (adminNotifyListener) Notify(event *registry.ServiceEvent) { // TODO implement me serviceURL := event.Service var interfaceName string categories := make(map[string]map[string]map[string]*common.URL) category := serviceURL.GetParam(constant.CategoryKey, "") if len(category) == 0 { if constant.ConsumerSide == serviceURL.GetParam(constant.Side, "") || constant.ConsumerProtocol == serviceURL.Protocol { category = constant.ConsumersCategory } else { category = constant.ProvidersCategory } } if strings.EqualFold(constant.EmptyProtocol, serviceURL.Protocol) { if services, ok := cache.InterfaceRegistryCache.Load(category); ok { if services != nil { servicesMap, ok := services.(*sync.Map) if !ok { // servicesMap type error logger.Logger().Error("servicesMap type not *sync.Map") return } group := serviceURL.Group() version := serviceURL.Version() if constant.AnyValue != group && constant.AnyValue != version { servicesMap.Delete(serviceURL.Service()) } else { // iterator services servicesMap.Range(func(key, value interface{}) bool { if util2.GetInterface(key.(string)) == serviceURL.Service() && (constant.AnyValue == group || group == util2.GetGroup(key.(string))) && (constant.AnyValue == version || version == util2.GetVersion(key.(string))) { servicesMap.Delete(key) } return true }) } } } } else { interfaceName = serviceURL.Service() var services map[string]map[string]*common.URL if s, ok := categories[category]; ok { services = s } else { services = make(map[string]map[string]*common.URL) categories[category] = services } service := serviceURL.ServiceKey() ids, found := services[service] if !found { ids = make(map[string]*common.URL) services[service] = ids } if md5, ok := UrlIdsMapper.Load(serviceURL.Key()); ok { ids[md5.(string)] = serviceURL } else { md5 := util2.Md5_16bit(serviceURL.Key()) ids[md5] = serviceURL UrlIdsMapper.LoadOrStore(serviceURL.Key(), md5) } } // check categories size if len(categories) > 0 { for category, value := range categories { services, ok := cache.InterfaceRegistryCache.Load(category) if ok { servicesMap, ok := services.(*sync.Map) if !ok { // servicesMap type error logger.Logger().Error("servicesMap type not *sync.Map") return } // iterator services key set servicesMap.Range(func(key, inner any) bool { _, ok := value[key.(string)] if util2.GetInterface(key.(string)) == interfaceName && !ok { servicesMap.Delete(key) } return true }) } else { services = &sync.Map{} cache.InterfaceRegistryCache.Store(category, services) } for k, v := range value { services.(*sync.Map).Store(k, v) } } } } func (adminNotifyListener) NotifyAll(events []*registry.ServiceEvent, f func()) { for _, event := range events { adminNotifyListener{}.Notify(event) } }