core/circuitbreaker/rule_manager.go (372 lines of code) (raw):
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package circuitbreaker
import (
"fmt"
"reflect"
"sync"
"github.com/pkg/errors"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
type CircuitBreakerGenFunc func(r *Rule, reuseStat interface{}) (CircuitBreaker, error)
var (
cbGenFuncMap = make(map[Strategy]CircuitBreakerGenFunc, 4)
breakerRules = make(map[string][]*Rule)
breakers = make(map[string][]CircuitBreaker)
updateMux = new(sync.RWMutex)
currentRules = make(map[string][]*Rule, 0)
updateRuleMux = new(sync.Mutex)
stateChangeListeners = make([]StateChangeListener, 0)
)
func init() {
cbGenFuncMap[SlowRequestRatio] = func(r *Rule, reuseStat interface{}) (CircuitBreaker, error) {
if r == nil {
return nil, errors.New("nil rule")
}
if reuseStat == nil {
return newSlowRtCircuitBreaker(r)
}
stat, ok := reuseStat.(*slowRequestLeapArray)
if !ok || stat == nil {
logging.Warn("[CircuitBreaker RuleManager] Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*slowRequestLeapArray", "statType", reflect.TypeOf(stat).Name())
return newSlowRtCircuitBreaker(r)
}
return newSlowRtCircuitBreakerWithStat(r, stat), nil
}
cbGenFuncMap[ErrorRatio] = func(r *Rule, reuseStat interface{}) (CircuitBreaker, error) {
if r == nil {
return nil, errors.New("nil rule")
}
if reuseStat == nil {
return newErrorRatioCircuitBreaker(r)
}
stat, ok := reuseStat.(*errorCounterLeapArray)
if !ok || stat == nil {
logging.Warn("[CircuitBreaker RuleManager] Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*errorCounterLeapArray", "statType", reflect.TypeOf(stat).Name())
return newErrorRatioCircuitBreaker(r)
}
return newErrorRatioCircuitBreakerWithStat(r, stat), nil
}
cbGenFuncMap[ErrorCount] = func(r *Rule, reuseStat interface{}) (CircuitBreaker, error) {
if r == nil {
return nil, errors.New("nil rule")
}
if reuseStat == nil {
return newErrorCountCircuitBreaker(r)
}
stat, ok := reuseStat.(*errorCounterLeapArray)
if !ok || stat == nil {
logging.Warn("[CircuitBreaker RuleManager] Expect to generate circuit breaker with reuse statistic, but fail to do type assertion, expect:*errorCounterLeapArray", "statType", reflect.TypeOf(stat).Name())
return newErrorCountCircuitBreaker(r)
}
return newErrorCountCircuitBreakerWithStat(r, stat), nil
}
}
// GetRulesOfResource returns specific resource's rules based on copy.
// It doesn't take effect for circuit breaker module if user changes the rule.
// GetRulesOfResource need to compete circuit breaker module's global lock and the high performance losses of copy,
//
// reduce or do not call GetRulesOfResource frequently if possible
func GetRulesOfResource(resource string) []Rule {
updateMux.RLock()
resRules, ok := breakerRules[resource]
updateMux.RUnlock()
if !ok {
return nil
}
ret := make([]Rule, 0, len(resRules))
for _, rule := range resRules {
ret = append(ret, *rule)
}
return ret
}
// GetRules returns all the rules based on copy.
// It doesn't take effect for circuit breaker module if user changes the rule.
// GetRules need to compete circuit breaker module's global lock and the high performance losses of copy,
//
// reduce or do not call GetRules if possible
func GetRules() []Rule {
updateMux.RLock()
rules := rulesFrom(breakerRules)
updateMux.RUnlock()
ret := make([]Rule, 0, len(rules))
for _, rule := range rules {
ret = append(ret, *rule)
}
return ret
}
// ClearRules clear all the previous rules.
func ClearRules() error {
_, err := LoadRules(nil)
return err
}
// LoadRules replaces old rules with the given circuit breaking rules.
//
// return value:
//
// bool: was designed to indicate whether the internal map has been changed
// error: was designed to indicate whether occurs the error.
func LoadRules(rules []*Rule) (bool, error) {
resRulesMap := make(map[string][]*Rule, 16)
for _, rule := range rules {
resRules, exist := resRulesMap[rule.Resource]
if !exist {
resRules = make([]*Rule, 0, 1)
}
resRulesMap[rule.Resource] = append(resRules, rule)
}
updateRuleMux.Lock()
defer updateRuleMux.Unlock()
isEqual := reflect.DeepEqual(currentRules, resRulesMap)
if isEqual {
logging.Info("[CircuitBreaker] Load rules is the same with current rules, so ignore load operation.")
return false, nil
}
err := onRuleUpdate(resRulesMap)
return true, err
}
// LoadRulesOfResource loads the given resource's circuitBreaker rules to the rule manager, while all previous resource's rules will be replaced.
// the first returned value indicates whether do real load operation, if the rules is the same with previous resource's rules, return false
func LoadRulesOfResource(res string, rules []*Rule) (bool, error) {
if len(res) == 0 {
return false, errors.New("empty resource")
}
updateRuleMux.Lock()
defer updateRuleMux.Unlock()
// clear resource rules
if len(rules) == 0 {
// clear resource's currentRules
delete(currentRules, res)
// clear breakers & breakerRules
updateMux.Lock()
delete(breakers, res)
delete(breakerRules, res)
updateMux.Unlock()
logging.Info("[CircuitBreaker] clear resource level rules", "resource", res)
return true, nil
}
// load resource level rules
isEqual := reflect.DeepEqual(currentRules[res], rules)
if isEqual {
logging.Info("[CircuitBreaker] Load resource level rules is the same with current resource level rules, so ignore load operation.")
return false, nil
}
err := onResourceRuleUpdate(res, rules)
return true, err
}
func getBreakersOfResource(resource string) []CircuitBreaker {
updateMux.RLock()
resCBs := breakers[resource]
updateMux.RUnlock()
ret := make([]CircuitBreaker, 0, len(resCBs))
if len(resCBs) == 0 {
return ret
}
ret = append(ret, resCBs...)
return ret
}
func calculateReuseIndexFor(r *Rule, oldResCbs []CircuitBreaker) (equalIdx, reuseStatIdx int) {
// the index of equivalent rule in old circuit breaker slice
equalIdx = -1
// the index of statistic reusable rule in old circuit breaker slice
reuseStatIdx = -1
for idx, oldTc := range oldResCbs {
oldRule := oldTc.BoundRule()
if oldRule.isEqualsTo(r) {
// break if there is equivalent rule
equalIdx = idx
break
}
// find the index of first StatReusable rule
if !oldRule.isStatReusable(r) {
continue
}
if reuseStatIdx >= 0 {
// had find reuse rule.
continue
}
reuseStatIdx = idx
}
return equalIdx, reuseStatIdx
}
// Concurrent safe to update rules
func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("%+v", r)
}
}
}()
// ignore invalid rules
validResRulesMap := make(map[string][]*Rule, len(rawResRulesMap))
for res, rules := range rawResRulesMap {
validResRules := make([]*Rule, 0, len(rules))
for _, rule := range rules {
if err := IsValidRule(rule); err != nil {
logging.Warn("[CircuitBreaker onRuleUpdate] Ignoring invalid circuit breaking rule when loading new rules", "rule", rule, "err", err.Error())
continue
}
validResRules = append(validResRules, rule)
}
if len(validResRules) > 0 {
validResRulesMap[res] = validResRules
}
}
start := util.CurrentTimeNano()
updateMux.RLock()
breakersClone := make(map[string][]CircuitBreaker, len(validResRulesMap))
for res, tcs := range breakers {
resTcClone := make([]CircuitBreaker, 0, len(tcs))
resTcClone = append(resTcClone, tcs...)
breakersClone[res] = resTcClone
}
updateMux.RUnlock()
newBreakers := make(map[string][]CircuitBreaker, len(validResRulesMap))
for res, resRules := range validResRulesMap {
newCbsOfRes := BuildResourceCircuitBreaker(res, resRules, breakersClone[res])
if len(newCbsOfRes) > 0 {
newBreakers[res] = newCbsOfRes
}
}
updateMux.Lock()
breakerRules = validResRulesMap
breakers = newBreakers
updateMux.Unlock()
currentRules = rawResRulesMap
logging.Debug("[CircuitBreaker onRuleUpdate] Time statistics(ns) for updating circuit breaker rule", "timeCost", util.CurrentTimeNano()-start)
LogRuleUpdate(validResRulesMap)
return nil
}
func onResourceRuleUpdate(res string, rawResRules []*Rule) (err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
}
}()
validResRules := make([]*Rule, 0, len(rawResRules))
for _, rule := range rawResRules {
if err := IsValidRule(rule); err != nil {
logging.Warn("[CircuitBreaker onResourceRuleUpdate] Ignoring invalid circuitBreaker rule", "rule", rule, "reason", err.Error())
continue
}
validResRules = append(validResRules, rule)
}
start := util.CurrentTimeNano()
oldResCbs := make([]CircuitBreaker, 0)
updateMux.RLock()
oldResCbs = append(oldResCbs, breakers[res]...)
updateMux.RUnlock()
newCbsOfRes := BuildResourceCircuitBreaker(res, rawResRules, oldResCbs)
updateMux.Lock()
if len(newCbsOfRes) == 0 {
delete(breakerRules, res)
delete(breakers, res)
} else {
breakerRules[res] = validResRules
breakers[res] = newCbsOfRes
}
updateMux.Unlock()
currentRules[res] = rawResRules
logging.Debug("[CircuitBreaker onResourceRuleUpdate] Time statistics(ns) for updating circuit breaker rule", "timeCost", util.CurrentTimeNano()-start)
logging.Info("[CircuitBreaker] load resource level rules", "resource", res, "validResRules", validResRules)
return nil
}
func rulesFrom(rm map[string][]*Rule) []*Rule {
rules := make([]*Rule, 0, 8)
if len(rm) == 0 {
return rules
}
for _, rs := range rm {
if len(rs) == 0 {
continue
}
for _, r := range rs {
if r != nil {
rules = append(rules, r)
}
}
}
return rules
}
func LogRuleUpdate(m map[string][]*Rule) {
rs := rulesFrom(m)
if len(rs) == 0 {
logging.Info("[CircuitBreakerRuleManager] Circuit breaking rules were cleared")
} else {
logging.Info("[CircuitBreakerRuleManager] Circuit breaking rules were loaded", "rules", rs)
}
}
// RegisterStateChangeListeners registers the global state change listener for all circuit breakers
// Note: this function is not thread-safe.
func RegisterStateChangeListeners(listeners ...StateChangeListener) {
if len(listeners) == 0 {
return
}
stateChangeListeners = append(stateChangeListeners, listeners...)
}
// ClearStateChangeListeners clears the all StateChangeListener
// Note: this function is not thread-safe.
func ClearStateChangeListeners() {
stateChangeListeners = make([]StateChangeListener, 0)
}
// SetCircuitBreakerGenerator sets the circuit breaker generator for the given strategy.
// Note that modifying the generator of default strategies is not allowed.
func SetCircuitBreakerGenerator(s Strategy, generator CircuitBreakerGenFunc) error {
if generator == nil {
return errors.New("nil generator")
}
if s <= ErrorCount {
return errors.New("not allowed to replace the generator for default circuit breaking strategies")
}
updateMux.Lock()
defer updateMux.Unlock()
cbGenFuncMap[s] = generator
return nil
}
func RemoveCircuitBreakerGenerator(s Strategy) error {
if s <= ErrorCount {
return errors.New("not allowed to remove the generator for default circuit breaking strategies")
}
updateMux.Lock()
defer updateMux.Unlock()
delete(cbGenFuncMap, s)
return nil
}
// ClearRulesOfResource clears resource level rules in circuitBreaker module.
func ClearRulesOfResource(res string) error {
_, err := LoadRulesOfResource(res, nil)
return err
}
// BuildResourceCircuitBreaker builds CircuitBreaker slice from rules. the resource of rules must be equals to res
func BuildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []CircuitBreaker) []CircuitBreaker {
newCbsOfRes := make([]CircuitBreaker, 0, len(rulesOfRes))
for _, r := range rulesOfRes {
if res != r.Resource {
logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, r.Resource), "Unmatched resource name in circuitBreaker.BuildResourceCircuitBreaker()", "rule", r)
continue
}
equalIdx, reuseStatIdx := calculateReuseIndexFor(r, oldResCbs)
// First check equals scenario
if equalIdx >= 0 {
// reuse the old cb
equalOldCb := oldResCbs[equalIdx]
newCbsOfRes = append(newCbsOfRes, equalOldCb)
// remove old cb from oldResCbs
oldResCbs = append(oldResCbs[:equalIdx], oldResCbs[equalIdx+1:]...)
continue
}
generator := cbGenFuncMap[r.Strategy]
if generator == nil {
logging.Warn("[CircuitBreaker BuildResourceCircuitBreaker] Ignoring the rule due to unsupported circuit breaking strategy", "rule", r)
continue
}
var cb CircuitBreaker
var e error
if reuseStatIdx >= 0 {
cb, e = generator(r, oldResCbs[reuseStatIdx].BoundStat())
} else {
cb, e = generator(r, nil)
}
if cb == nil || e != nil {
logging.Warn("[CircuitBreaker BuildResourceCircuitBreaker] Ignoring the rule due to bad generated circuit breaker", "rule", r, "err", e.Error())
continue
}
if reuseStatIdx >= 0 {
oldResCbs = append(oldResCbs[:reuseStatIdx], oldResCbs[reuseStatIdx+1:]...)
}
newCbsOfRes = append(newCbsOfRes, cb)
}
return newCbsOfRes
}
func IsValidRule(r *Rule) error {
if r == nil {
return errors.New("nil Rule")
}
if len(r.Resource) == 0 {
return errors.New("empty resource name")
}
if r.StatIntervalMs <= 0 {
return errors.New("invalid StatIntervalMs")
}
if r.RetryTimeoutMs <= 0 {
return errors.New("invalid RetryTimeoutMs")
}
if r.Threshold < 0.0 {
return errors.New("invalid Threshold")
}
if r.Strategy == SlowRequestRatio && r.Threshold > 1.0 {
return errors.New("invalid slow request ratio threshold (valid range: [0.0, 1.0])")
}
if r.Strategy == ErrorRatio && r.Threshold > 1.0 {
return errors.New("invalid error ratio threshold (valid range: [0.0, 1.0])")
}
if r.StatSlidingWindowBucketCount != 0 && r.StatIntervalMs%r.StatSlidingWindowBucketCount != 0 {
logging.Warn("[CircuitBreaker IsValidRule] The following must be true: StatIntervalMs % StatSlidingWindowBucketCount == 0. StatSlidingWindowBucketCount will be replaced by 1", "rule", r)
}
return nil
}