pkg/core/registry/application_context.go (205 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package registry import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/metadata/info" "dubbo.apache.org/dubbo-go/v3/registry" gxset "github.com/dubbogo/gost/container/set" "strings" "sync" ) type ApplicationContext struct { // InterfaceName []*common.URL serviceUrls sync.Map // Revision *info.MetadataInfo revisionToMetadata sync.Map // AppName []registry.ServiceInstance allInstances sync.Map // Mappings *gxset.HashSet mappings sync.Map } func NewApplicationContext() *ApplicationContext { return &ApplicationContext{} } // GetServiceUrls returns the reference to the serviceUrls map with read lock func (ac *ApplicationContext) GetServiceUrls() map[string][]*common.URL { result := make(map[string][]*common.URL) ac.serviceUrls.Range(func(key, value interface{}) bool { result[key.(string)] = value.([]*common.URL) return true }) return result } func (ac *ApplicationContext) DeleteServiceUrl(key string, url *common.URL) { if v, ok := ac.serviceUrls.Load(key); ok { urls := v.([]*common.URL) for i, u := range urls { if urlEqual(u, url) { urls = append(urls[:i], urls[i+1:]...) ac.serviceUrls.Store(key, urls) return } } } } func (ac *ApplicationContext) UpdateServiceUrls(interfaceKey string, url *common.URL) { v, _ := ac.serviceUrls.LoadOrStore(interfaceKey, []*common.URL{}) urls := v.([]*common.URL) urls = append(urls, url) ac.serviceUrls.Store(interfaceKey, urls) } func (ac *ApplicationContext) AddServiceUrls(newServiceUrls map[string][]*common.URL) { for k, v := range newServiceUrls { var urls []*common.URL if existingV, ok := ac.serviceUrls.Load(k); ok { urls = existingV.([]*common.URL) } urlExists := func(newUrl *common.URL) bool { for _, url := range urls { if urlEqual(url, newUrl) { return true } } return false } if urls == nil { ac.serviceUrls.Store(k, v) } else { for _, newUrl := range v { if !urlExists(newUrl) { urls = append(urls, newUrl) } } ac.serviceUrls.Store(k, urls) } } } // GetRevisionToMetadata returns the reference to the revisionToMetadata map with read lock func (ac *ApplicationContext) GetRevisionToMetadata(revision string) *info.MetadataInfo { if v, ok := ac.revisionToMetadata.Load(revision); ok { return v.(*info.MetadataInfo) } return nil } func (ac *ApplicationContext) UpdateRevisionToMetadata(key string, newKey string, value *info.MetadataInfo) { if key == newKey { return } if key != "" { ac.revisionToMetadata.Delete(key) } ac.revisionToMetadata.Store(newKey, value) } func (ac *ApplicationContext) DeleteRevisionToMetadata(key string) { if key != "" { ac.revisionToMetadata.Delete(key) } } func (ac *ApplicationContext) NewRevisionToMetadata(newRevisionToMetadata map[string]*info.MetadataInfo) { for k, v := range newRevisionToMetadata { ac.revisionToMetadata.Store(k, v) } } func (ac *ApplicationContext) GetOldRevision(instance registry.ServiceInstance) string { if v, ok := ac.allInstances.Load(instance.GetServiceName()); ok { for _, elem := range v.([]registry.ServiceInstance) { if instance.GetID() == elem.GetID() { return elem.GetMetadata()[constant.ExportedServicesRevisionPropertyName] } } } return "" } // GetAllInstances returns the reference to the allInstances map with read lock func (ac *ApplicationContext) GetAllInstances() map[string][]registry.ServiceInstance { result := make(map[string][]registry.ServiceInstance) ac.allInstances.Range(func(key, value interface{}) bool { result[key.(string)] = value.([]registry.ServiceInstance) return true }) return result } func (ac *ApplicationContext) DeleteAllInstance(key string, instance registry.ServiceInstance) { if v, ok := ac.allInstances.Load(key); ok { instances := v.([]registry.ServiceInstance) for i, serviceInstance := range instances { if serviceInstance.GetID() == instance.GetID() { instances = append(instances[:i], instances[i+1:]...) ac.allInstances.Store(key, instances) return } } } } func (ac *ApplicationContext) UpdateAllInstances(key string, instance registry.ServiceInstance) { if v, ok := ac.allInstances.Load(key); ok { instances := v.([]registry.ServiceInstance) for i, serviceInstance := range instances { if serviceInstance.GetID() == instance.GetID() { instances[i] = serviceInstance ac.allInstances.Store(key, instances) return } } } v, _ := ac.allInstances.LoadOrStore(key, []registry.ServiceInstance{}) instances := v.([]registry.ServiceInstance) instances = append(instances, instance) ac.allInstances.Store(key, instances) } func (ac *ApplicationContext) AddAllInstances(key string, value []registry.ServiceInstance) { v, _ := ac.allInstances.LoadOrStore(key, []registry.ServiceInstance{}) instances := v.([]registry.ServiceInstance) instanceExists := func(instance registry.ServiceInstance) bool { for i, inst := range instances { if inst.GetID() == instance.GetID() { instances[i] = instance return true } } return false } for _, inst := range value { if !instanceExists(inst) { instances = append(instances, inst) } } ac.allInstances.Store(key, instances) } func (ac *ApplicationContext) UpdateMapping(mapping map[string]*gxset.HashSet) { for k, v := range mapping { ac.mappings.Store(k, v) } } func (ac *ApplicationContext) GetMapping() map[string]*gxset.HashSet { result := make(map[string]*gxset.HashSet) ac.mappings.Range(func(key, value interface{}) bool { result[key.(string)] = value.(*gxset.HashSet) return true }) return result } func urlEqual(url *common.URL, c *common.URL) bool { tmpC := c.Clone() tmpURL := url.Clone() cGroup := tmpC.GetParam(constant.GroupKey, "") urlGroup := tmpURL.GetParam(constant.GroupKey, "") cKey := tmpC.Key() urlKey := tmpURL.Key() if cGroup == constant.AnyValue { cKey = strings.Replace(cKey, "group=*", "group="+urlGroup, 1) } else if urlGroup == constant.AnyValue { urlKey = strings.Replace(urlKey, "group=*", "group="+cGroup, 1) } // 1. protocol, username, password, ip, port, service name, group, version should be equal if cKey != urlKey { return false } // 2. if URL contains enabled key, should be true, or * if tmpURL.GetParam(constant.EnabledKey, "true") != "true" && tmpURL.GetParam(constant.EnabledKey, "") != constant.AnyValue { return false } return isMatchCategory(tmpURL.GetParam(constant.CategoryKey, constant.DefaultCategory), tmpC.GetParam(constant.CategoryKey, constant.DefaultCategory)) } func isMatchCategory(category1 string, category2 string) bool { if len(category2) == 0 { return category1 == constant.DefaultCategory } else if strings.Contains(category2, constant.AnyValue) { return true } else if strings.Contains(category2, constant.RemoveValuePrefix) { return !strings.Contains(category2, constant.RemoveValuePrefix+category1) } else { return strings.Contains(category2, category1) } }