cluster/router/affinity/router.go (170 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package affinity import ( "math" "strings" "sync" ) import ( "github.com/dubbogo/gost/log/logger" "gopkg.in/yaml.v2" ) import ( "dubbo.apache.org/dubbo-go/v3/cluster/router/condition" "dubbo.apache.org/dubbo-go/v3/common" conf "dubbo.apache.org/dubbo-go/v3/common/config" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/remoting" ) type ServiceAffinityRoute struct { affinityRoute } func newServiceAffinityRoute() *ServiceAffinityRoute { return &ServiceAffinityRoute{} } func (s *ServiceAffinityRoute) Notify(invokers []protocol.Invoker) { if len(invokers) == 0 { return } url := invokers[0].GetURL() if url == nil { logger.Error("Failed to notify a Service Affinity rule, because url is empty") return } dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() if dynamicConfiguration == nil { logger.Infof("Config center does not start, Affinity router will not be enabled") return } key := strings.Join([]string{url.ColonSeparatedKey(), constant.AffinityRuleSuffix}, "") dynamicConfiguration.AddListener(key, s) value, err := dynamicConfiguration.GetRule(key) if err != nil { logger.Errorf("Failed to query affinity rule, key=%s, err=%v", key, err) return } s.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeAdd}) } type ApplicationAffinityRoute struct { affinityRoute application string currentApplication string } func newApplicationAffinityRouter() *ApplicationAffinityRoute { applicationName := config.GetApplicationConfig().Name a := &ApplicationAffinityRoute{ currentApplication: applicationName, } dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() if dynamicConfiguration != nil { dynamicConfiguration.AddListener(strings.Join([]string{applicationName, constant.AffinityRuleSuffix}, ""), a) } return a } func (s *ApplicationAffinityRoute) Notify(invokers []protocol.Invoker) { if len(invokers) == 0 { return } url := invokers[0].GetURL() if url == nil { logger.Error("Failed to notify a dynamically affinity rule, because url is empty") return } dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() if dynamicConfiguration == nil { logger.Infof("Config center does not start, Affinity router will not be enabled") return } providerApplication := url.GetParam("application", "") if providerApplication == "" || providerApplication == s.currentApplication { logger.Warn("Affinity router get providerApplication is empty, will not subscribe to provider app rules.") return } if providerApplication != s.application { if s.application != "" { dynamicConfiguration.RemoveListener(strings.Join([]string{s.application, constant.AffinityRuleSuffix}, ""), s) } s.application = providerApplication key := strings.Join([]string{providerApplication, constant.AffinityRuleSuffix}, "") dynamicConfiguration.AddListener(key, s) value, err := dynamicConfiguration.GetRule(key) if err != nil { logger.Errorf("Failed to query affinity rule, key=%s, err=%v", key, err) return } s.Process(&config_center.ConfigChangeEvent{Key: key, Value: value, ConfigType: remoting.EventTypeUpdate}) } } type affinityRoute struct { mu sync.RWMutex matcher *condition.FieldMatcher enabled bool key string ratio int32 } func (a *affinityRoute) Process(event *config_center.ConfigChangeEvent) { a.mu.Lock() defer a.mu.Unlock() a.matcher, a.enabled, a.key, a.ratio = nil, false, "", 0 switch event.ConfigType { case remoting.EventTypeDel: case remoting.EventTypeAdd, remoting.EventTypeUpdate: cfg, err := parseConfig(event.Value.(string)) if err != nil { logger.Errorf("Failed to parse affinity config, key=%s, err=%v", a.key, err) return } if cfg.AffinityAware.Ratio < 0 || cfg.AffinityAware.Ratio > 100 { logger.Errorf("Failed to parse affinity config, affinity.ratio=%d, expect 0-100", a.ratio) return } key := strings.TrimSpace(cfg.AffinityAware.Key) if !cfg.Enabled || key == "" { return } rule := strings.Join([]string{key, key}, "=$") f, err := condition.NewFieldMatcher(rule) if err != nil { logger.Errorf("Failed to parse affinity config, key=%s, rule=%s ,err=%v", a.key, rule, err) return } a.matcher, a.enabled, a.key, a.ratio = &f, true, key, cfg.AffinityAware.Ratio } } func (a *affinityRoute) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if len(invokers) == 0 { return invokers } a.mu.RLock() enabled, matcher, ratio := a.enabled, a.matcher, a.ratio a.mu.RUnlock() if !enabled { return invokers } res := make([]protocol.Invoker, 0, len(invokers)) for _, invoker := range invokers { if matcher.MatchInvoker(url, invoker, invocation) { res = append(res, invoker) } } if float32(len(res))/float32(len(invokers)) >= float32(ratio)/float32(100) { return res } return invokers } func (a *affinityRoute) URL() *common.URL { return nil } func (a *affinityRoute) Priority() int64 { // expect this router is the last one in the router chain return math.MinInt64 } func (a *affinityRoute) Notify(_ []protocol.Invoker) { panic("this function should not be called") } func parseConfig(c string) (config.AffinityRouter, error) { res := config.AffinityRouter{} err := yaml.Unmarshal([]byte(c), &res) return res, err }