core/outlier/rule_manager.go (249 lines of code) (raw):
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package outlier
import (
"errors"
"fmt"
"reflect"
"sync"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)
var (
// resource name ---> outlier ejection rule
outlierRules = make(map[string]*Rule)
// resource name ---> circuitbreaker rule
breakerRules = make(map[string]*circuitbreaker.Rule)
// resource name ---> address ---> circuitbreaker
nodeBreakers = make(map[string]map[string]circuitbreaker.CircuitBreaker)
// resource name ---> outlier ejection rule
currentRules = make(map[string]*Rule)
updateMux = new(sync.RWMutex)
updateRuleMux = new(sync.Mutex)
)
func getNodeBreakersOfResource(resource string) map[string]circuitbreaker.CircuitBreaker {
updateMux.RLock()
nodes := nodeBreakers[resource]
updateMux.RUnlock()
ret := make(map[string]circuitbreaker.CircuitBreaker, len(nodes))
for address, breaker := range nodes {
ret[address] = breaker
}
return ret
}
func deleteNodeBreakerOfResource(resource string, address string) {
updateMux.Lock()
defer updateMux.Unlock()
if _, ok := nodeBreakers[resource]; ok {
delete(nodeBreakers[resource], address)
logging.Info("[Outlier] delete node breaker", "resourceName", resource, "address", address)
}
}
func addNodeBreakerOfResource(resource string, address string) {
newBreakers := circuitbreaker.BuildResourceCircuitBreaker(resource,
[]*circuitbreaker.Rule{getBreakerRuleOfResource(resource)}, []circuitbreaker.CircuitBreaker{})
if len(newBreakers) > 0 {
updateMux.Lock()
if nodeBreakers[resource] == nil {
nodeBreakers[resource] = make(map[string]circuitbreaker.CircuitBreaker)
}
nodeBreakers[resource][address] = newBreakers[0]
updateMux.Unlock()
logging.Info("[Outlier] add node breaker", "resourceName", resource, "address", address)
}
}
func getOutlierRuleOfResource(resource string) *Rule {
updateMux.RLock()
rule := outlierRules[resource]
updateMux.RUnlock()
return rule
}
func getBreakerRuleOfResource(resource string) *circuitbreaker.Rule {
updateMux.RLock()
rule := breakerRules[resource]
updateMux.RUnlock()
return rule
}
// GetRules returns all the rules based on copy.
// It doesn't take effect for outlier ejection module if user changes the rule.
// GetRules need to compete outlier ejection module's global lock and the high performance losses of copy,
//
// reduce or do not call GetRules if possible
func GetRules() []Rule {
updateMux.RLock()
rules := rulesFrom(outlierRules)
updateMux.RUnlock()
ret := make([]Rule, 0, len(rules))
for _, rule := range rules {
ret = append(ret, *rule)
}
return ret
}
func rulesFrom(rm map[string]*Rule) []*Rule {
rules := make([]*Rule, 0, 8)
if len(rm) == 0 {
return rules
}
for _, r := range rm {
if r != nil {
rules = append(rules, r)
}
}
return rules
}
// ClearRules clear all the previous rules.
func ClearRules() error {
_, err := LoadRules(nil)
return err
}
// LoadRules replaces old outlier ejection rules with the given rules.
//
// return value:
//
// bool: was designed to indicate whether the internal map has been changed
// error: was designed to indicate whether occurs the error.
func LoadRules(rules []*Rule) (bool, error) {
rulesMap := make(map[string]*Rule, 16)
for _, rule := range rules {
rulesMap[rule.Resource] = rule
}
updateRuleMux.Lock()
defer updateRuleMux.Unlock()
isEqual := reflect.DeepEqual(currentRules, rulesMap)
if isEqual {
logging.Info("[Outlier] Load rules is the same with current rules, so ignore load operation.")
return false, nil
}
err := onRuleUpdate(rulesMap)
return true, err
}
// LoadRuleOfResource loads the given resource's outlier ejection rule to the rule manager, while previous resource's rule will be replaced.
// the first returned value indicates whether do real load operation, if the rule is the same with previous resource's rule, return false
func LoadRuleOfResource(res string, rule *Rule) (bool, error) {
if len(res) == 0 {
return false, errors.New("empty resource")
}
updateRuleMux.Lock()
defer updateRuleMux.Unlock()
// clear resource rule
if rule == nil {
delete(currentRules, res)
updateMux.Lock()
delete(nodeBreakers, res)
delete(breakerRules, res)
delete(outlierRules, res)
updateMux.Unlock()
logging.Info("[Outlier] clear resource level rule", "resource", res)
return true, nil
}
// load resource level rule
isEqual := reflect.DeepEqual(currentRules[res], rule)
if isEqual {
logging.Info("[Outlier] Load resource level rule is the same with current resource level rule, so ignore load operation.")
return false, nil
}
err := onResourceRuleUpdate(res, rule)
return true, err
}
func onResourceRuleUpdate(res string, rule *Rule) (err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
}
}()
circuitRule := rule.Rule
if err = IsValidRule(rule); err != nil {
logging.Warn("[Outlier onResourceRuleUpdate] Ignoring invalid outlier ejection rule", "rule", rule, "err", err.Error())
return
}
if err = circuitbreaker.IsValidRule(circuitRule); err != nil {
logging.Warn("[Outlier onRuleUpdate] Ignoring invalid rule when loading new rules", "rule", rule, "err", err.Error())
return
}
start := util.CurrentTimeNano()
breakers := getNodeBreakersOfResource(res)
newBreakers := make(map[string]circuitbreaker.CircuitBreaker)
for address, breaker := range breakers {
newCbsOfRes := circuitbreaker.BuildResourceCircuitBreaker(res,
[]*circuitbreaker.Rule{circuitRule}, []circuitbreaker.CircuitBreaker{breaker})
if len(newCbsOfRes) > 0 {
newBreakers[address] = newCbsOfRes[0]
}
}
updateMux.Lock()
outlierRules[res] = rule
breakerRules[res] = circuitRule
nodeBreakers[res] = newBreakers
updateMux.Unlock()
currentRules[res] = rule
logging.Debug("[Outlier onResourceRuleUpdate] Time statistics(ns) for updating outlier ejection rule", "timeCost", util.CurrentTimeNano()-start)
logging.Info("[Outlier] load resource level rule", "resource", res, "rule", rule)
return nil
}
// onRuleUpdate is concurrent safe to update outlier ejection rules
func onRuleUpdate(rulesMap map[string]*Rule) (err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("%+v", r)
}
}
}()
// ignore invalid outlier ejection rule
validCircuitRulesMap := make(map[string]*circuitbreaker.Rule, len(rulesMap))
validRulesMap := make(map[string]*Rule, len(rulesMap))
for resource, rule := range rulesMap {
circuitRule := rule.Rule
if err = IsValidRule(rule); err != nil {
logging.Warn("[Outlier onRuleUpdate] Ignoring invalid rule when loading new rules", "rule", rule, "err", err.Error())
continue
}
if err = circuitbreaker.IsValidRule(circuitRule); err != nil {
logging.Warn("[Outlier onRuleUpdate] Ignoring invalid rule when loading new rules", "rule", rule, "err", err.Error())
continue
}
validCircuitRulesMap[resource] = circuitRule
validRulesMap[resource] = rule
}
currentRules = rulesMap
updateMux.Lock()
breakerRules = validCircuitRulesMap
outlierRules = validRulesMap
updateMux.Unlock()
updateAllBreakers()
LogRuleUpdate(outlierRules)
return nil
}
// ClearRuleOfResource clears resource level rule in outlier ejection module.
func ClearRuleOfResource(res string) error {
_, err := LoadRuleOfResource(res, nil)
return err
}
func IsValidRule(r *Rule) error {
if r == nil {
return errors.New("nil Rule")
}
if len(r.Resource) == 0 {
return errors.New("empty resource name")
}
if r.MaxEjectionPercent < 0.0 || r.MaxEjectionPercent > 1.0 {
return errors.New("invalid MaxEjectionPercent")
}
return nil
}
func updateAllBreakers() {
start := util.CurrentTimeNano()
updateMux.RLock()
breakersClone := make(map[string]map[string]circuitbreaker.CircuitBreaker, len(nodeBreakers))
for resource, breakers := range nodeBreakers {
breakersClone[resource] = make(map[string]circuitbreaker.CircuitBreaker)
for address, breaker := range breakers {
breakersClone[resource][address] = breaker
}
}
updateMux.RUnlock()
newBreakers := make(map[string]map[string]circuitbreaker.CircuitBreaker, len(breakerRules))
for resource, rule := range breakerRules {
newBreakers[resource] = make(map[string]circuitbreaker.CircuitBreaker)
for address, breaker := range breakersClone[resource] {
newCbsOfRes := circuitbreaker.BuildResourceCircuitBreaker(resource,
[]*circuitbreaker.Rule{rule}, []circuitbreaker.CircuitBreaker{breaker})
if len(newCbsOfRes) > 0 {
newBreakers[resource][address] = newCbsOfRes[0]
}
}
}
updateMux.Lock()
nodeBreakers = newBreakers
updateMux.Unlock()
logging.Debug("[Outlier onRuleUpdate] Time statistics(ns) for updating all circuit breakers", "timeCost", util.CurrentTimeNano()-start)
}
func LogRuleUpdate(rules map[string]*Rule) {
if len(rules) == 0 {
logging.Info("[OutlierRuleManager] Outlier ejection rules were cleared")
} else {
logging.Info("[OutlierRuleManager] Outlier ejection rules were loaded", "rules", rules)
}
}