core/flow/rule_manager.go (542 lines of code) (raw):

// Copyright 1999-2020 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package flow import ( "fmt" "reflect" "sync" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/config" "github.com/alibaba/sentinel-golang/core/stat" sbase "github.com/alibaba/sentinel-golang/core/stat/base" "github.com/alibaba/sentinel-golang/core/system_metric" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" ) // TrafficControllerGenFunc represents the TrafficShapingController generator function of a specific control behavior. type TrafficControllerGenFunc func(*Rule, *standaloneStatistic) (*TrafficShapingController, error) type trafficControllerGenKey struct { tokenCalculateStrategy TokenCalculateStrategy controlBehavior ControlBehavior } // TrafficControllerMap represents the map storage for TrafficShapingController. type TrafficControllerMap map[string][]*TrafficShapingController var ( tcGenFuncMap = make(map[trafficControllerGenKey]TrafficControllerGenFunc, 6) tcMap = make(TrafficControllerMap) tcMux = new(sync.RWMutex) nopStat = &standaloneStatistic{ reuseResourceStat: false, readOnlyMetric: base.NopReadStat(), writeOnlyMetric: base.NopWriteStat(), } currentRules = make(map[string][]*Rule, 0) updateRuleMux = new(sync.Mutex) ) func init() { // Initialize the traffic shaping controller generator map for existing control behaviors. tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: Direct, controlBehavior: Reject, }] = func(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { if boundStat == nil { var err error boundStat, err = generateStatFor(rule) if err != nil { return nil, err } } tsc, err := NewTrafficShapingController(rule, boundStat) if err != nil || tsc == nil { return nil, err } tsc.flowCalculator = NewDirectTrafficShapingCalculator(tsc, rule.Threshold) tsc.flowChecker = NewRejectTrafficShapingChecker(tsc, rule) return tsc, nil } tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: Direct, controlBehavior: Throttling, }] = func(rule *Rule, _ *standaloneStatistic) (*TrafficShapingController, error) { // Direct token calculate strategy and throttling control behavior don't use stat, so we just give a nop stat. tsc, err := NewTrafficShapingController(rule, nopStat) if err != nil || tsc == nil { return nil, err } tsc.flowCalculator = NewDirectTrafficShapingCalculator(tsc, rule.Threshold) tsc.flowChecker = NewThrottlingChecker(tsc, rule.MaxQueueingTimeMs, rule.StatIntervalInMs) return tsc, nil } tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: WarmUp, controlBehavior: Reject, }] = func(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { if boundStat == nil { var err error boundStat, err = generateStatFor(rule) if err != nil { return nil, err } } tsc, err := NewTrafficShapingController(rule, boundStat) if err != nil || tsc == nil { return nil, err } tsc.flowCalculator = NewWarmUpTrafficShapingCalculator(tsc, rule) tsc.flowChecker = NewRejectTrafficShapingChecker(tsc, rule) return tsc, nil } tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: WarmUp, controlBehavior: Throttling, }] = func(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { if boundStat == nil { var err error boundStat, err = generateStatFor(rule) if err != nil { return nil, err } } tsc, err := NewTrafficShapingController(rule, boundStat) if err != nil || tsc == nil { return nil, err } tsc.flowCalculator = NewWarmUpTrafficShapingCalculator(tsc, rule) tsc.flowChecker = NewThrottlingChecker(tsc, rule.MaxQueueingTimeMs, rule.StatIntervalInMs) return tsc, nil } tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: MemoryAdaptive, controlBehavior: Reject, }] = func(rule *Rule, boundStat *standaloneStatistic) (*TrafficShapingController, error) { if boundStat == nil { var err error boundStat, err = generateStatFor(rule) if err != nil { return nil, err } } tsc, err := NewTrafficShapingController(rule, boundStat) if err != nil || tsc == nil { return nil, err } tsc.flowCalculator = NewMemoryAdaptiveTrafficShapingCalculator(tsc, rule) tsc.flowChecker = NewRejectTrafficShapingChecker(tsc, rule) return tsc, nil } tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: MemoryAdaptive, controlBehavior: Throttling, }] = func(rule *Rule, _ *standaloneStatistic) (*TrafficShapingController, error) { // MemoryAdaptive token calculate strategy and throttling control behavior don't use stat, so we just give a nop stat. tsc, err := NewTrafficShapingController(rule, nopStat) if err != nil || tsc == nil { return nil, err } tsc.flowCalculator = NewMemoryAdaptiveTrafficShapingCalculator(tsc, rule) tsc.flowChecker = NewThrottlingChecker(tsc, rule.MaxQueueingTimeMs, rule.StatIntervalInMs) return tsc, nil } } func logRuleUpdate(m map[string][]*Rule) { rules := make([]*Rule, 0, 8) for _, rs := range m { if len(rs) == 0 { continue } rules = append(rules, rs...) } if len(rules) == 0 { logging.Info("[FlowRuleManager] Flow rules were cleared") } else { logging.Info("[FlowRuleManager] Flow rules were loaded", "rules", rules) } } func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) { defer func() { if r := recover(); r != nil { var ok bool err, ok = r.(error) if !ok { err = fmt.Errorf("%v", r) } } }() // ignore invalid rules validResRulesMap := make(map[string][]*Rule, len(rawResRulesMap)) for res, rules := range rawResRulesMap { validResRules := make([]*Rule, 0, len(rules)) for _, rule := range rules { if err := IsValidRule(rule); err != nil { logging.Warn("[Flow onRuleUpdate] Ignoring invalid flow rule", "rule", rule, "reason", err.Error()) continue } validResRules = append(validResRules, rule) } if len(validResRules) > 0 { validResRulesMap[res] = validResRules } } start := util.CurrentTimeNano() tcMux.RLock() tcMapClone := make(TrafficControllerMap, len(validResRulesMap)) for res, tcs := range tcMap { resTcClone := make([]*TrafficShapingController, 0, len(tcs)) resTcClone = append(resTcClone, tcs...) tcMapClone[res] = resTcClone } tcMux.RUnlock() m := make(TrafficControllerMap, len(validResRulesMap)) for res, rulesOfRes := range validResRulesMap { newTcsOfRes := buildResourceTrafficShapingController(res, rulesOfRes, tcMapClone[res]) if len(newTcsOfRes) > 0 { m[res] = newTcsOfRes } } tcMux.Lock() tcMap = m tcMux.Unlock() currentRules = rawResRulesMap logging.Debug("[Flow onRuleUpdate] Time statistic(ns) for updating flow rule", "timeCost", util.CurrentTimeNano()-start) logRuleUpdate(validResRulesMap) return nil } // LoadRules loads the given flow rules to the rule manager, while all previous rules will be replaced. // the first returned value indicates whether do real load operation, if the rules is the same with previous rules, return false func LoadRules(rules []*Rule) (bool, error) { resRulesMap := make(map[string][]*Rule, 16) for _, rule := range rules { resRules, exist := resRulesMap[rule.Resource] if !exist { resRules = make([]*Rule, 0, 1) } resRulesMap[rule.Resource] = append(resRules, rule) } updateRuleMux.Lock() defer updateRuleMux.Unlock() isEqual := reflect.DeepEqual(currentRules, resRulesMap) if isEqual { logging.Info("[Flow] Load rules is the same with current rules, so ignore load operation.") return false, nil } err := onRuleUpdate(resRulesMap) return true, err } func onResourceRuleUpdate(res string, rawResRules []*Rule) (err error) { defer func() { if r := recover(); r != nil { var ok bool err, ok = r.(error) if !ok { err = fmt.Errorf("%v", r) } } }() validResRules := make([]*Rule, 0, len(rawResRules)) for _, rule := range rawResRules { if err := IsValidRule(rule); err != nil { logging.Warn("[Flow onResourceRuleUpdate] Ignoring invalid flow rule", "rule", rule, "reason", err.Error()) continue } validResRules = append(validResRules, rule) } start := util.CurrentTimeNano() oldResTcs := make([]*TrafficShapingController, 0) tcMux.RLock() oldResTcs = append(oldResTcs, tcMap[res]...) tcMux.RUnlock() newResTcs := buildResourceTrafficShapingController(res, validResRules, oldResTcs) tcMux.Lock() if len(newResTcs) == 0 { delete(tcMap, res) } else { tcMap[res] = newResTcs } tcMux.Unlock() currentRules[res] = rawResRules logging.Debug("[Flow onResourceRuleUpdate] Time statistic(ns) for updating flow rule", "timeCost", util.CurrentTimeNano()-start) logging.Info("[Flow] load resource level rules", "resource", res, "validResRules", validResRules) return nil } // LoadRulesOfResource loads the given resource's flow rules to the rule manager, while all previous resource's rules will be replaced. // the first returned value indicates whether do real load operation, if the rules is the same with previous resource's rules, return false func LoadRulesOfResource(res string, rules []*Rule) (bool, error) { if len(res) == 0 { return false, errors.New("empty resource") } updateRuleMux.Lock() defer updateRuleMux.Unlock() // clear resource rules if len(rules) == 0 { // clear resource's currentRules delete(currentRules, res) // clear tcMap tcMux.Lock() delete(tcMap, res) tcMux.Unlock() logging.Info("[Flow] clear resource level rules", "resource", res) return true, nil } // load resource level rules isEqual := reflect.DeepEqual(currentRules[res], rules) if isEqual { logging.Info("[Flow] Load resource level rules is the same with current resource level rules, so ignore load operation.") return false, nil } err := onResourceRuleUpdate(res, rules) return true, err } // getRules returns all the rules。Any changes of rules take effect for flow module // getRules is an internal interface. func getRules() []*Rule { tcMux.RLock() defer tcMux.RUnlock() return rulesFrom(tcMap) } // getRulesOfResource returns specific resource's rules。Any changes of rules take effect for flow module // getRulesOfResource is an internal interface. func getRulesOfResource(res string) []*Rule { tcMux.RLock() defer tcMux.RUnlock() resTcs, exist := tcMap[res] if !exist { return nil } ret := make([]*Rule, 0, len(resTcs)) for _, tc := range resTcs { ret = append(ret, tc.BoundRule()) } return ret } // GetRules returns all the rules based on copy. // It doesn't take effect for flow module if user changes the rule. func GetRules() []Rule { rules := getRules() ret := make([]Rule, 0, len(rules)) for _, rule := range rules { ret = append(ret, *rule) } return ret } // GetRulesOfResource returns specific resource's rules based on copy. // It doesn't take effect for flow module if user changes the rule. func GetRulesOfResource(res string) []Rule { rules := getRulesOfResource(res) ret := make([]Rule, 0, len(rules)) for _, rule := range rules { ret = append(ret, *rule) } return ret } // ClearRules clears all the rules in flow module. func ClearRules() error { _, err := LoadRules(nil) return err } // ClearRulesOfResource clears resource level rules in flow module. func ClearRulesOfResource(res string) error { _, err := LoadRulesOfResource(res, nil) return err } func rulesFrom(m TrafficControllerMap) []*Rule { rules := make([]*Rule, 0, 8) if len(m) == 0 { return rules } for _, rs := range m { if len(rs) == 0 { continue } for _, r := range rs { if r != nil && r.BoundRule() != nil { rules = append(rules, r.BoundRule()) } } } return rules } func generateStatFor(rule *Rule) (*standaloneStatistic, error) { if !rule.needStatistic() { return nopStat, nil } intervalInMs := rule.StatIntervalInMs var retStat standaloneStatistic var resNode *stat.ResourceNode if rule.RelationStrategy == AssociatedResource { // use associated statistic resNode = stat.GetOrCreateResourceNode(rule.RefResource, base.ResTypeCommon) } else { resNode = stat.GetOrCreateResourceNode(rule.Resource, base.ResTypeCommon) } if intervalInMs == 0 || intervalInMs == config.MetricStatisticIntervalMs() { // default case, use the resource's default statistic readStat := resNode.DefaultMetric() retStat.reuseResourceStat = true retStat.readOnlyMetric = readStat retStat.writeOnlyMetric = nil return &retStat, nil } sampleCount := uint32(0) //calculate the sample count if intervalInMs > config.GlobalStatisticIntervalMsTotal() { sampleCount = 1 } else if intervalInMs < config.GlobalStatisticBucketLengthInMs() { sampleCount = 1 } else { if intervalInMs%config.GlobalStatisticBucketLengthInMs() == 0 { sampleCount = intervalInMs / config.GlobalStatisticBucketLengthInMs() } else { sampleCount = 1 } } err := base.CheckValidityForReuseStatistic(sampleCount, intervalInMs, config.GlobalStatisticSampleCountTotal(), config.GlobalStatisticIntervalMsTotal()) if err == nil { // global statistic reusable readStat, e := resNode.GenerateReadStat(sampleCount, intervalInMs) if e != nil { return nil, e } retStat.reuseResourceStat = true retStat.readOnlyMetric = readStat retStat.writeOnlyMetric = nil return &retStat, nil } else if err == base.GlobalStatisticNonReusableError { logging.Info("[FlowRuleManager] Flow rule couldn't reuse global statistic and will generate independent statistic", "rule", rule) retStat.reuseResourceStat = false realLeapArray := sbase.NewBucketLeapArray(sampleCount, intervalInMs) metricStat, e := sbase.NewSlidingWindowMetric(sampleCount, intervalInMs, realLeapArray) if e != nil { return nil, errors.Errorf("fail to generate statistic for warm up rule: %+v, err: %+v", rule, e) } retStat.readOnlyMetric = metricStat retStat.writeOnlyMetric = realLeapArray return &retStat, nil } return nil, errors.Wrapf(err, "fail to new standalone statistic because of invalid StatIntervalInMs in flow.Rule, StatIntervalInMs: %d", intervalInMs) } // SetTrafficShapingGenerator sets the traffic controller generator for the given TokenCalculateStrategy and ControlBehavior. // Note that modifying the generator of default control strategy is not allowed. func SetTrafficShapingGenerator(tokenCalculateStrategy TokenCalculateStrategy, controlBehavior ControlBehavior, generator TrafficControllerGenFunc) error { if generator == nil { return errors.New("nil generator") } if tokenCalculateStrategy >= Direct && tokenCalculateStrategy <= WarmUp { return errors.New("not allowed to replace the generator for default control strategy") } if controlBehavior >= Reject && controlBehavior <= Throttling { return errors.New("not allowed to replace the generator for default control strategy") } tcMux.Lock() defer tcMux.Unlock() tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: tokenCalculateStrategy, controlBehavior: controlBehavior, }] = generator return nil } func RemoveTrafficShapingGenerator(tokenCalculateStrategy TokenCalculateStrategy, controlBehavior ControlBehavior) error { if tokenCalculateStrategy >= Direct && tokenCalculateStrategy <= WarmUp { return errors.New("not allowed to replace the generator for default control strategy") } if controlBehavior >= Reject && controlBehavior <= Throttling { return errors.New("not allowed to replace the generator for default control strategy") } tcMux.Lock() defer tcMux.Unlock() delete(tcGenFuncMap, trafficControllerGenKey{ tokenCalculateStrategy: tokenCalculateStrategy, controlBehavior: controlBehavior, }) return nil } func getTrafficControllerListFor(name string) []*TrafficShapingController { tcMux.RLock() defer tcMux.RUnlock() return tcMap[name] } func calculateReuseIndexFor(r *Rule, oldResTcs []*TrafficShapingController) (equalIdx, reuseStatIdx int) { // the index of equivalent rule in old traffic shaping controller slice equalIdx = -1 // the index of statistic reusable rule in old traffic shaping controller slice reuseStatIdx = -1 for idx, oldTc := range oldResTcs { oldRule := oldTc.BoundRule() if oldRule.isEqualsTo(r) { // break if there is equivalent rule equalIdx = idx break } // search the index of first stat reusable rule if !oldRule.isStatReusable(r) { continue } if reuseStatIdx >= 0 { // had find reuse rule. continue } reuseStatIdx = idx } return equalIdx, reuseStatIdx } // buildResourceTrafficShapingController builds TrafficShapingController slice from rules. the resource of rules must be equals to res func buildResourceTrafficShapingController(res string, rulesOfRes []*Rule, oldResTcs []*TrafficShapingController) []*TrafficShapingController { newTcsOfRes := make([]*TrafficShapingController, 0, len(rulesOfRes)) for _, rule := range rulesOfRes { if res != rule.Resource { logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, rule.Resource), "Unmatched resource name in flow.buildResourceTrafficShapingController()", "rule", rule) continue } equalIdx, reuseStatIdx := calculateReuseIndexFor(rule, oldResTcs) // First check equals scenario if equalIdx >= 0 { // reuse the old tc equalOldTc := oldResTcs[equalIdx] newTcsOfRes = append(newTcsOfRes, equalOldTc) // remove old tc from oldResTcs oldResTcs = append(oldResTcs[:equalIdx], oldResTcs[equalIdx+1:]...) continue } generator, supported := tcGenFuncMap[trafficControllerGenKey{ tokenCalculateStrategy: rule.TokenCalculateStrategy, controlBehavior: rule.ControlBehavior, }] if !supported || generator == nil { logging.Error(errors.New("unsupported flow control strategy"), "Ignoring the rule due to unsupported control behavior in flow.buildResourceTrafficShapingController()", "rule", rule) continue } var tc *TrafficShapingController var e error if reuseStatIdx >= 0 { tc, e = generator(rule, &(oldResTcs[reuseStatIdx].boundStat)) } else { tc, e = generator(rule, nil) } if tc == nil || e != nil { logging.Error(errors.New("bad generated traffic controller"), "Ignoring the rule due to bad generated traffic controller in flow.buildResourceTrafficShapingController()", "rule", rule) continue } if reuseStatIdx >= 0 { // remove old tc from oldResTcs oldResTcs = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...) } newTcsOfRes = append(newTcsOfRes, tc) } return newTcsOfRes } // IsValidRule checks whether the given Rule is valid. func IsValidRule(rule *Rule) error { if rule == nil { return errors.New("nil Rule") } if rule.Resource == "" { return errors.New("empty Resource") } if rule.Threshold < 0 { return errors.New("negative Threshold") } if int32(rule.TokenCalculateStrategy) < 0 { return errors.New("negative TokenCalculateStrategy") } if int32(rule.ControlBehavior) < 0 { return errors.New("negative ControlBehavior") } if !(rule.RelationStrategy >= CurrentResource && rule.RelationStrategy <= AssociatedResource) { return errors.New("invalid RelationStrategy") } if rule.RelationStrategy == AssociatedResource && rule.RefResource == "" { return errors.New("RefResource must be non empty when RelationStrategy is AssociatedResource") } if rule.TokenCalculateStrategy == WarmUp { if rule.WarmUpPeriodSec <= 0 { return errors.New("WarmUpPeriodSec must be great than 0") } if rule.WarmUpColdFactor == 1 { return errors.New("WarmUpColdFactor must be great than 1") } } if rule.StatIntervalInMs > 10*60*1000 { logging.Info("StatIntervalInMs is great than 10 minutes, less than 10 minutes is recommended.") } if rule.TokenCalculateStrategy == MemoryAdaptive { if rule.LowMemUsageThreshold <= 0 { return errors.New("rule.LowMemUsageThreshold <= 0") } if rule.HighMemUsageThreshold <= 0 { return errors.New("rule.HighMemUsageThreshold <= 0") } if rule.HighMemUsageThreshold >= rule.LowMemUsageThreshold { return errors.New("rule.HighMemUsageThreshold >= rule.LowMemUsageThreshold") } if rule.MemLowWaterMarkBytes <= 0 { return errors.New("rule.MemLowWaterMarkBytes <= 0") } if rule.MemHighWaterMarkBytes <= 0 { return errors.New("rule.MemHighWaterMarkBytes <= 0") } if rule.MemHighWaterMarkBytes > int64(system_metric.TotalMemorySize) { return errors.New("rule.MemHighWaterMarkBytes should not be greater than current system's total memory size") } if rule.MemLowWaterMarkBytes >= rule.MemHighWaterMarkBytes { // can not be equal to defeat from zero overflow return errors.New("rule.MemLowWaterMarkBytes >= rule.MemHighWaterMarkBytes") } } return nil }