cluster/router/condition/route.go (387 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package condition import ( "math/rand" "regexp" "sort" "strings" "sync" ) import ( "github.com/dubbogo/gost/log/logger" "github.com/pkg/errors" "gopkg.in/yaml.v2" ) import ( "dubbo.apache.org/dubbo-go/v3/cluster/router/condition/matcher" "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/protocol" ) var ( routePattern = regexp.MustCompile("([&!=,]*)\\s*([^&!=,\\s]+)") illegalMsg = "Illegal route rule \"%s\", The error char '%s' before '%s'" matcherFactories = make([]matcher.ConditionMatcherFactory, 0, 8) once sync.Once ) type StateRouter struct { whenCondition map[string]matcher.Matcher thenCondition map[string]matcher.Matcher } func NewConditionStateRouter(url *common.URL) (*StateRouter, error) { once.Do(initMatcherFactories) if len(matcherFactories) == 0 { return nil, errors.Errorf("No ConditionMatcherFactory exists") } c := &StateRouter{} when, then, err := generateMatcher(url) if err != nil { return nil, err } c.whenCondition = when c.thenCondition = then return c, nil } // Route Determine the target invokers list. // condition rule like `self_condition => peers_condition ` // // @return active_peers_invokers, Is_self_condition_match_success func (s *StateRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if len(invokers) == 0 { return invokers } if !s.matchWhen(url, invocation) { return invokers } if len(s.thenCondition) == 0 { logger.Warn("condition state router thenCondition is empty") return []protocol.Invoker{} } var result = make([]protocol.Invoker, 0, len(invokers)) for _, invoker := range invokers { if s.matchThen(invoker.GetURL(), url) { result = append(result, invoker) } } return result } func (s *StateRouter) matchWhen(url *common.URL, invocation protocol.Invocation) bool { if len(s.whenCondition) == 0 { return true } return doMatch(url, nil, invocation, s.whenCondition, true) } func (s *StateRouter) matchThen(url *common.URL, param *common.URL) bool { if len(s.thenCondition) == 0 { return false } return doMatch(url, param, nil, s.thenCondition, false) } func generateMatcher(url *common.URL) (when, then map[string]matcher.Matcher, err error) { rule := url.GetParam(constant.RuleKey, "") if rule == "" || len(strings.Trim(rule, " ")) == 0 { return nil, nil, errors.Errorf("Illegal route rule!") } rule = strings.Replace(rule, "consumer.", "", -1) rule = strings.Replace(rule, "provider.", "", -1) i := strings.Index(rule, "=>") // for the case of `{when rule} => {then rule}` var whenRule string var thenRule string if i < 0 { whenRule = "" thenRule = strings.Trim(rule, " ") } else { whenRule = strings.Trim(rule[0:i], " ") thenRule = strings.Trim(rule[i+2:], " ") } when, err = parseWhen(whenRule) if err != nil { return nil, nil, err } then, err = parseThen(thenRule) if err != nil { return nil, nil, err } // NOTE: It should be determined on the business level whether the `When condition` can be empty or not. return when, then, nil } func parseWhen(whenRule string) (map[string]matcher.Matcher, error) { if whenRule == "" || whenRule == " " || whenRule == "true" { return make(map[string]matcher.Matcher), nil } else { when, err := parseRule(whenRule) if err != nil { return nil, err } return when, nil } } func parseThen(thenRule string) (map[string]matcher.Matcher, error) { if thenRule == "" || thenRule == " " || thenRule == "false" { return nil, nil } else { when, err := parseRule(thenRule) if err != nil { return nil, err } return when, nil } } func parseRule(rule string) (map[string]matcher.Matcher, error) { if isRuleEmpty(rule) { return make(map[string]matcher.Matcher), nil } condition, err := processMatchers(rule) if err != nil { return nil, err } return condition, nil } func isRuleEmpty(rule string) bool { return rule == "" || (len(rule) == 1 && rule[0] == ' ') } func processMatchers(rule string) (map[string]matcher.Matcher, error) { condition := make(map[string]matcher.Matcher) var currentMatcher matcher.Matcher var err error values := make(map[string]struct{}) for _, matchers := range routePattern.FindAllStringSubmatch(rule, -1) { separator := matchers[1] content := matchers[2] switch separator { case "": currentMatcher = getMatcher(content) condition[content] = currentMatcher case "&": currentMatcher, condition = processAndSeparator(content, condition) case "=", "!=": values, currentMatcher, err = processEqualNotEqualSeparator(separator, content, currentMatcher, rule) if err != nil { return nil, err } case ",": values, err = processCommaSeparator(content, values, rule) if err != nil { return nil, err } default: return nil, errors.Errorf(illegalMsg, rule, separator, content) } } return condition, nil } func processAndSeparator(content string, condition map[string]matcher.Matcher) (matcher.Matcher, map[string]matcher.Matcher) { currentMatcher := condition[content] if currentMatcher == nil { currentMatcher = getMatcher(content) condition[content] = currentMatcher } return currentMatcher, condition } func processEqualNotEqualSeparator(separator, content string, currentMatcher matcher.Matcher, rule string) (map[string]struct{}, matcher.Matcher, error) { if currentMatcher == nil { return nil, nil, errors.Errorf(illegalMsg, rule, separator, content) } values := currentMatcher.GetMatches() if separator == "!=" { values = currentMatcher.GetMismatches() } values[content] = struct{}{} return values, currentMatcher, nil } func processCommaSeparator(content string, values map[string]struct{}, rule string) (map[string]struct{}, error) { if len(values) == 0 { return nil, errors.Errorf(illegalMsg, rule, ",", content) } values[content] = struct{}{} return values, nil } func getMatcher(key string) matcher.Matcher { for _, factory := range matcherFactories { if factory.ShouldMatch(key) { return factory.NewMatcher(key) } } return matcher.GetMatcherFactory(constant.Param).NewMatcher(key) } func doMatch(url *common.URL, param *common.URL, invocation protocol.Invocation, conditions map[string]matcher.Matcher, isWhenCondition bool) bool { sample := url.ToMap() for _, matcherPair := range conditions { if !matcher.Match(matcherPair, sample, param, invocation, isWhenCondition) { return false } } return true } func initMatcherFactories() { factoriesMap := matcher.GetMatcherFactories() if len(factoriesMap) == 0 { return } for _, factory := range factoriesMap { matcherFactories = append(matcherFactories, factory()) } sortMatcherFactories(matcherFactories) } func sortMatcherFactories(matcherFactories []matcher.ConditionMatcherFactory) { sort.Stable(byPriority(matcherFactories)) } type byPriority []matcher.ConditionMatcherFactory func (a byPriority) Len() int { return len(a) } func (a byPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byPriority) Less(i, j int) bool { return a[i].Priority() < a[j].Priority() } func parseConditionRoute(routeContent string) (*config.RouterConfig, error) { routeDecoder := yaml.NewDecoder(strings.NewReader(routeContent)) routerConfig := &config.RouterConfig{} err := routeDecoder.Decode(routerConfig) if err != nil { return nil, err } return routerConfig, nil } type FieldMatcher struct { rule string match map[string]matcher.Matcher } func NewFieldMatcher(rule string) (FieldMatcher, error) { m, err := parseRule(rule) if err != nil { return FieldMatcher{}, err } return FieldMatcher{rule: rule, match: m}, nil } func (m *FieldMatcher) MatchRequest(url *common.URL, invocation protocol.Invocation) bool { return doMatch(url, nil, invocation, m.match, true) } func (m *FieldMatcher) MatchInvoker(url *common.URL, ivk protocol.Invoker, invocation protocol.Invocation) bool { return doMatch(ivk.GetURL(), url, nil, m.match, false) } // MultiDestRouter Multiply-Destination-Router type MultiDestRouter struct { whenCondition FieldMatcher thenCondition []condSet } type condSet struct { FieldMatcher subSetWeight int } func newCondSet(rule string, subSetWeight int) (condSet, error) { if subSetWeight <= 0 { subSetWeight = constant.DefaultRouteConditionSubSetWeight } m, err := NewFieldMatcher(rule) if err != nil { return condSet{}, err } return condSet{FieldMatcher: m, subSetWeight: subSetWeight}, nil } type destination struct { matchRule string weight int ivks []protocol.Invoker } type destSets struct { destinations []*destination weightSum int } func newDestSets() *destSets { return &destSets{ destinations: []*destination{}, weightSum: 0, } } func (s *destSets) addDest(weight int, rule string, ivks []protocol.Invoker) { s.destinations = append(s.destinations, &destination{weight: weight, matchRule: rule, ivks: ivks}) s.weightSum += weight } func (s *destSets) randDest() *destination { if s.weightSum == 0 { return nil } if len(s.destinations) == 1 { return s.destinations[0] } sum := rand.Intn(s.weightSum) for _, d := range s.destinations { sum -= d.weight if sum <= 0 { return d } } return nil } func (m MultiDestRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) ([]protocol.Invoker, bool) { if len(invokers) == 0 { return invokers, false } if !m.whenCondition.MatchRequest(url, invocation) { return invokers, false } if len(m.thenCondition) == 0 { logger.Warn("condition state router thenCondition is empty") return []protocol.Invoker{}, true } destinations := newDestSets() for _, condition := range m.thenCondition { res := make([]protocol.Invoker, 0) for _, invoker := range invokers { if condition.MatchInvoker(url, invoker, invocation) { res = append(res, invoker) } } if len(res) != 0 { destinations.addDest(condition.subSetWeight, condition.rule, res) } } // use to print log, if route empty i, ok := invocation.Attributes()["condition-chain"].([]string) if !ok { i = []string{} } d := destinations.randDest() if d != nil { invocation.Attributes()["condition-chain"] = append(i, "request="+m.whenCondition.rule+",invokers="+d.matchRule) return d.ivks, true } thenRule := make([]string, 0, len(m.thenCondition)) for _, set := range m.thenCondition { thenRule = append(thenRule, set.rule) } invocation.Attributes()["condition-chain"] = append(i, "request="+m.whenCondition.rule+",invokers!="+strings.Join(thenRule, ",")) return []protocol.Invoker{}, true } func NewConditionMultiDestRouter(url *common.URL) (*MultiDestRouter, error) { once.Do(initMatcherFactories) if len(matcherFactories) == 0 { return nil, errors.Errorf("No ConditionMatcherFactory exists") } rawCondConf, ok := url.GetAttribute(constant.RuleKey) if !ok { return nil, errors.Errorf("Condition Router can't get the rule key") } condConf, ok := rawCondConf.(*config.ConditionRule) if !ok { return nil, errors.Errorf("Condition Router get the rule key invaild , got %T", rawCondConf) } // ensure config effective if (condConf.To == nil || len(condConf.To) == 0) && condConf.From.Match == "" { return nil, nil } c := &MultiDestRouter{ thenCondition: make([]condSet, 0, len(condConf.To)), } var err error c.whenCondition, err = NewFieldMatcher(condConf.From.Match) if err != nil { return nil, err } for _, ruleTo := range condConf.To { cs, err := newCondSet(ruleTo.Match, ruleTo.Weight) if err != nil { return nil, err } c.thenCondition = append(c.thenCondition, cs) } return c, nil } func parseMultiConditionRoute(routeContent string) (*config.ConditionRouter, error) { routeDecoder := yaml.NewDecoder(strings.NewReader(routeContent)) routerConfig := &config.ConditionRouter{} err := routeDecoder.Decode(routerConfig) if err != nil { return nil, err } return routerConfig, nil }