core/hotspot/rule_manager.go (350 lines of code) (raw):

// Copyright 1999-2020 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package hotspot import ( "fmt" "reflect" "sync" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" ) // TrafficControllerGenFunc represents the TrafficShapingController generator function of a specific control behavior. type TrafficControllerGenFunc func(r *Rule, reuseMetric *ParamsMetric) TrafficShapingController // trafficControllerMap represents the map storage for TrafficShapingController. type trafficControllerMap map[string][]TrafficShapingController var ( tcGenFuncMap = make(map[ControlBehavior]TrafficControllerGenFunc, 4) tcMap = make(trafficControllerMap) tcMux = new(sync.RWMutex) currentRules = make(map[string][]*Rule, 0) updateRuleMux = new(sync.Mutex) ) func init() { // Initialize the traffic shaping controller generator map for existing control behaviors. tcGenFuncMap[Reject] = func(r *Rule, reuseMetric *ParamsMetric) TrafficShapingController { var baseTc *baseTrafficShapingController if reuseMetric != nil { // new BaseTrafficShapingController with reuse statistic metric baseTc = newBaseTrafficShapingControllerWithMetric(r, reuseMetric) } else { baseTc = newBaseTrafficShapingController(r) } if baseTc == nil { return nil } return &rejectTrafficShapingController{ baseTrafficShapingController: *baseTc, burstCount: r.BurstCount, } } tcGenFuncMap[Throttling] = func(r *Rule, reuseMetric *ParamsMetric) TrafficShapingController { var baseTc *baseTrafficShapingController if reuseMetric != nil { baseTc = newBaseTrafficShapingControllerWithMetric(r, reuseMetric) } else { baseTc = newBaseTrafficShapingController(r) } if baseTc == nil { return nil } return &throttlingTrafficShapingController{ baseTrafficShapingController: *baseTc, maxQueueingTimeMs: r.MaxQueueingTimeMs, } } } func getTrafficControllersFor(res string) []TrafficShapingController { tcMux.RLock() defer tcMux.RUnlock() return tcMap[res] } // LoadRules replaces all old hotspot param flow rules with the given rules. // Return value: // // bool: indicates whether the internal map has been changed; // error: indicates whether occurs the error. func LoadRules(rules []*Rule) (bool, error) { resRulesMap := make(map[string][]*Rule, 16) for _, rule := range rules { resRules, exists := resRulesMap[rule.Resource] if !exists { resRules = make([]*Rule, 0, 1) } resRulesMap[rule.Resource] = append(resRules, rule) } updateRuleMux.Lock() defer updateRuleMux.Unlock() isEqual := reflect.DeepEqual(currentRules, resRulesMap) if isEqual { logging.Info("[HotSpot] Load rules is the same with current rules, so ignore load operation.") return false, nil } err := onRuleUpdate(resRulesMap) return true, err } // GetRules returns all the hotspot param flow rules based on copy. // It doesn't take effect for hotspot module if user changes the returned rules. // GetRules need to compete hotspot module's global lock and the high performance losses of copy, // reduce or do not call GetRules if possible. func GetRules() []Rule { tcMux.RLock() rules := rulesFrom(tcMap) tcMux.RUnlock() ret := make([]Rule, 0, len(rules)) for _, rule := range rules { ret = append(ret, *rule) } return ret } // GetRulesOfResource returns specific resource's hotspot parameter flow control rules based on copy. // It doesn't take effect for hotspot module if user changes the returned rules. // GetRulesOfResource need to compete hotspot module's global lock and the high performance losses of copy, // // reduce or do not call GetRulesOfResource frequently if possible. func GetRulesOfResource(res string) []Rule { tcMux.RLock() resTcs := tcMap[res] tcMux.RUnlock() ret := make([]Rule, 0, len(resTcs)) for _, tc := range resTcs { ret = append(ret, *tc.BoundRule()) } return ret } // ClearRules clears all hotspot param flow rules. func ClearRules() error { _, err := LoadRules(nil) return err } // ClearRulesOfResource clears resource level hotspot param flow rules. func ClearRulesOfResource(res string) error { _, err := LoadRulesOfResource(res, nil) return err } func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) { defer func() { if r := recover(); r != nil { var ok bool err, ok = r.(error) if !ok { err = fmt.Errorf("%+v", r) } } }() // ignore invalid rules validResRulesMap := make(map[string][]*Rule, len(rawResRulesMap)) for res, rules := range rawResRulesMap { validResRules := make([]*Rule, 0, len(rules)) for _, rule := range rules { if err := IsValidRule(rule); err != nil { logging.Warn("[HotSpot onRuleUpdate] Ignoring invalid hotspot param flow rule when loading new rules", "rule", rule, "err", err.Error()) continue } validResRules = append(validResRules, rule) } if len(validResRules) > 0 { validResRulesMap[res] = validResRules } } start := util.CurrentTimeNano() tcMux.RLock() tcMapClone := make(trafficControllerMap, len(tcMap)) for res, tcs := range tcMap { resTcClone := make([]TrafficShapingController, 0, len(tcs)) resTcClone = append(resTcClone, tcs...) tcMapClone[res] = resTcClone } tcMux.RUnlock() m := make(trafficControllerMap, len(validResRulesMap)) for res, rules := range validResRulesMap { m[res] = buildResourceTrafficShapingController(res, rules, tcMapClone[res]) } tcMux.Lock() tcMap = m tcMux.Unlock() currentRules = rawResRulesMap logging.Debug("[HotSpot onRuleUpdate] Time statistic(ns) for updating hotspot param flow rules", "timeCost", util.CurrentTimeNano()-start) logRuleUpdate(validResRulesMap) return nil } func onResourceRuleUpdate(res string, rawResRules []*Rule) (err error) { defer func() { if r := recover(); r != nil { var ok bool err, ok = r.(error) if !ok { err = fmt.Errorf("%v", r) } } }() validResRules := make([]*Rule, 0, len(rawResRules)) for _, rule := range rawResRules { if err := IsValidRule(rule); err != nil { logging.Warn("[HotSpot onResourceRuleUpdate] Ignoring invalid hotspot param flow rule", "rule", rule, "reason", err.Error()) continue } validResRules = append(validResRules, rule) } start := util.CurrentTimeNano() oldResTcs := make([]TrafficShapingController, 0, 8) tcMux.RLock() oldResTcs = append(oldResTcs, tcMap[res]...) tcMux.RUnlock() newResTcs := buildResourceTrafficShapingController(res, validResRules, oldResTcs) tcMux.Lock() if len(newResTcs) == 0 { delete(tcMap, res) } else { tcMap[res] = newResTcs } tcMux.Unlock() currentRules[res] = rawResRules logging.Debug("[HotSpot onResourceRuleUpdate] Time statistic(ns) for updating hotspot param flow rules", "timeCost", util.CurrentTimeNano()-start) logging.Info("[HotSpot] load resource level hotspot param flow rules", "resource", res, "validResRules", validResRules) return nil } // LoadRulesOfResource loads the given resource's hotspot param flow rules to the rule manager, // while all previous resource's rules will be replaced. The first returned value indicates whether // do real load operation, if the rules is the same with previous resource's rules, return false. func LoadRulesOfResource(res string, rules []*Rule) (bool, error) { if len(res) == 0 { return false, errors.New("empty resource") } updateRuleMux.Lock() defer updateRuleMux.Unlock() // clear resource rules if len(rules) == 0 { // clear resource's currentRules delete(currentRules, res) // clear tcMap tcMux.Lock() delete(tcMap, res) tcMux.Unlock() logging.Info("[HotSpot] clear resource level hotspot param flow rules", "resource", res) return true, nil } // load resource level rules isEqual := reflect.DeepEqual(currentRules[res], rules) if isEqual { logging.Info("[HotSpot] Load resource level hotspot param flow rules is the same with current resource level rules, so ignore load operation.") return false, nil } err := onResourceRuleUpdate(res, rules) return true, err } func logRuleUpdate(m map[string][]*Rule) { rules := make([]*Rule, 0, 8) for _, rs := range m { if len(rs) == 0 { continue } rules = append(rules, rs...) } if len(rules) == 0 { logging.Info("[HotspotRuleManager] Hotspot param flow rules were cleared") } else { logging.Info("[HotspotRuleManager] Hotspot param flow rules were loaded", "rules", rules) } } func rulesFrom(m trafficControllerMap) []*Rule { rules := make([]*Rule, 0, 8) if len(m) == 0 { return rules } for _, rs := range m { if len(rs) == 0 { continue } for _, r := range rs { if r != nil && r.BoundRule() != nil { rules = append(rules, r.BoundRule()) } } } return rules } func calculateReuseIndexFor(r *Rule, oldResTcs []TrafficShapingController) (equalIdx, reuseStatIdx int) { // the index of equivalent rule in old traffic shaping controller slice equalIdx = -1 // the index of statistic reusable rule in old traffic shaping controller slice reuseStatIdx = -1 for idx, oldTc := range oldResTcs { oldRule := oldTc.BoundRule() if oldRule.Equals(r) { // break if there is equivalent rule equalIdx = idx break } // find the index of first StatReusable rule if !oldRule.IsStatReusable(r) { continue } if reuseStatIdx >= 0 { // had find reuse rule. continue } reuseStatIdx = idx } return equalIdx, reuseStatIdx } // buildResourceTrafficShapingController builds TrafficShapingController slice from rules. the resource of rules must be equals to res. func buildResourceTrafficShapingController(res string, resRules []*Rule, oldResTcs []TrafficShapingController) []TrafficShapingController { newTcsOfRes := make([]TrafficShapingController, 0, len(resRules)) for _, rule := range resRules { if res != rule.Resource { logging.Error(errors.Errorf("unmatched resource name, expect: %s, actual: %s", res, rule.Resource), "Unmatched resource name in hotspot.buildResourceTrafficShapingController()", "rule", rule) continue } equalIdx, reuseStatIdx := calculateReuseIndexFor(rule, oldResTcs) // there is equivalent rule in old traffic shaping controller slice if equalIdx >= 0 { equalOldTC := oldResTcs[equalIdx] newTcsOfRes = append(newTcsOfRes, equalOldTC) // remove old tc from old resTcs oldResTcs = append(oldResTcs[:equalIdx], oldResTcs[equalIdx+1:]...) continue } // generate new traffic shaping controller generator, supported := tcGenFuncMap[rule.ControlBehavior] if !supported { logging.Warn("[HotSpot buildResourceTrafficShapingController] Ignoring the hotspot param flow rule due to unsupported control behavior", "rule", rule) continue } var tc TrafficShapingController if reuseStatIdx >= 0 { // generate new traffic shaping controller with reusable statistic metric. tc = generator(rule, oldResTcs[reuseStatIdx].BoundMetric()) // remove the reused traffic shaping controller old res tcs oldResTcs = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...) } else { tc = generator(rule, nil) } if tc == nil { logging.Debug("[HotSpot buildResourceTrafficShapingController] Ignoring the hotspot param flow rule due to bad generated traffic controller", "rule", rule) continue } newTcsOfRes = append(newTcsOfRes, tc) } return newTcsOfRes } func IsValidRule(rule *Rule) error { if rule == nil { return errors.New("nil hotspot Rule") } if len(rule.Resource) == 0 { return errors.New("empty resource name") } if rule.Threshold < 0 { return errors.New("negative threshold") } if rule.MetricType < 0 { return errors.New("invalid metric type") } if rule.ControlBehavior < 0 { return errors.New("invalid control strategy") } if rule.MetricType == QPS && rule.DurationInSec <= 0 { return errors.New("invalid duration") } if rule.ParamIndex > 0 && rule.ParamKey != "" { return errors.New("invalid param index and param key are mutually exclusive") } return checkControlBehaviorField(rule) } func checkControlBehaviorField(rule *Rule) error { switch rule.ControlBehavior { case Reject: if rule.BurstCount < 0 { return errors.New("invalid BurstCount") } return nil case Throttling: if rule.MaxQueueingTimeMs < 0 { return errors.New("invalid MaxQueueingTimeMs") } return nil default: } return nil } // SetTrafficShapingGenerator sets the traffic controller generator for the given control behavior. // Note that modifying the generator of default control behaviors is not allowed. func SetTrafficShapingGenerator(cb ControlBehavior, generator TrafficControllerGenFunc) error { if generator == nil { return errors.New("nil generator") } if cb >= Reject && cb <= Throttling { return errors.New("not allowed to replace the generator for default control behaviors") } tcMux.Lock() defer tcMux.Unlock() tcGenFuncMap[cb] = generator return nil } func RemoveTrafficShapingGenerator(cb ControlBehavior) error { if cb >= Reject && cb <= Throttling { return errors.New("not allowed to replace the generator for default control behaviors") } tcMux.Lock() defer tcMux.Unlock() delete(tcGenFuncMap, cb) return nil }