core/circuitbreaker/circuit_breaker.go (576 lines of code) (raw):
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package circuitbreaker
import (
"reflect"
"sync/atomic"
"github.com/alibaba/sentinel-golang/core/base"
sbase "github.com/alibaba/sentinel-golang/core/stat/base"
metric_exporter "github.com/alibaba/sentinel-golang/exporter/metric"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)
// Circuit Breaker State Machine:
//
// switch to open based on rule
// +-----------------------------------------------------------------------+
// | |
// | v
// +----------------+ +----------------+ Probe +----------------+
// | | | |<----------------| |
// | | Probe succeed | | | |
// | Closed |<------------------| HalfOpen | | Open |
// | | | | Probe failed | |
// | | | +---------------->| |
// +----------------+ +----------------+ +----------------+
type State int32
const (
Closed State = iota
HalfOpen
Open
)
var (
stateChangedCounter = metric_exporter.NewCounter(
"circuit_breaker_state_changed_total",
"Circuit breaker total state change count",
[]string{"resource", "from_state", "to_state"})
)
func init() {
metric_exporter.Register(stateChangedCounter)
}
func newState() *State {
var state State
state = Closed
return &state
}
func (s *State) String() string {
switch s.get() {
case Closed:
return "Closed"
case HalfOpen:
return "HalfOpen"
case Open:
return "Open"
default:
return "Undefined"
}
}
func (s *State) get() State {
return State(atomic.LoadInt32((*int32)(s)))
}
func (s *State) set(update State) {
atomic.StoreInt32((*int32)(s), int32(update))
}
func (s *State) cas(expect State, update State) bool {
return atomic.CompareAndSwapInt32((*int32)(s), int32(expect), int32(update))
}
// StateChangeListener listens on the circuit breaker state change event
type StateChangeListener interface {
// OnTransformToClosed is triggered when circuit breaker state transformed to Closed.
// Argument rule is copy from circuit breaker's rule, any changes of rule don't take effect for circuit breaker
// Copying rule has a performance penalty and avoids invalid listeners as much as possible
OnTransformToClosed(prev State, rule Rule)
// OnTransformToOpen is triggered when circuit breaker state transformed to Open.
// The "snapshot" indicates the triggered value when the transformation occurs.
// Argument rule is copy from circuit breaker's rule, any changes of rule don't take effect for circuit breaker
// Copying rule has a performance penalty and avoids invalid listeners as much as possible
OnTransformToOpen(prev State, rule Rule, snapshot interface{})
// OnTransformToHalfOpen is triggered when circuit breaker state transformed to HalfOpen.
// Argument rule is copy from circuit breaker's rule, any changes of rule don't take effect for circuit breaker
// Copying rule has a performance penalty and avoids invalid listeners as much as possible
OnTransformToHalfOpen(prev State, rule Rule)
}
// CircuitBreaker is the basic interface of circuit breaker
type CircuitBreaker interface {
// BoundRule returns the associated circuit breaking rule.
BoundRule() *Rule
// BoundStat returns the associated statistic data structure.
BoundStat() interface{}
// TryPass acquires permission of an invocation only if it is available at the time of invocation.
TryPass(ctx *base.EntryContext) bool
// CurrentState returns current state of the circuit breaker.
CurrentState() State
// OnRequestComplete record a completed request with the given response time as well as error (if present),
// and handle state transformation of the circuit breaker.
// OnRequestComplete is called only when a passed invocation finished.
OnRequestComplete(rtt uint64, err error)
}
// ================================= circuitBreakerBase ====================================
// circuitBreakerBase encompasses the common fields of circuit breaker.
type circuitBreakerBase struct {
rule *Rule
// retryTimeoutMs represents recovery timeout (in milliseconds) before the circuit breaker opens.
// During the open period, no requests are permitted until the timeout has elapsed.
// After that, the circuit breaker will transform to half-open state for trying a few "trial" requests.
retryTimeoutMs uint32
// nextRetryTimestampMs is the time circuit breaker could probe
nextRetryTimestampMs uint64
// probeNumber is the number of probe requests that are allowed to pass when the circuit breaker is half open.
probeNumber uint64
// curProbeNumber is the real-time probe number.
curProbeNumber uint64
// state is the state machine of circuit breaker
state *State
}
func (b *circuitBreakerBase) BoundRule() *Rule {
return b.rule
}
func (b *circuitBreakerBase) CurrentState() State {
return b.state.get()
}
func (b *circuitBreakerBase) retryTimeoutArrived() bool {
return util.CurrentTimeMillis() >= atomic.LoadUint64(&b.nextRetryTimestampMs)
}
func (b *circuitBreakerBase) updateNextRetryTimestamp() {
atomic.StoreUint64(&b.nextRetryTimestampMs, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs))
}
func (b *circuitBreakerBase) addCurProbeNum() {
atomic.AddUint64(&b.curProbeNumber, 1)
}
func (b *circuitBreakerBase) resetCurProbeNum() {
atomic.StoreUint64(&b.curProbeNumber, 0)
}
// fromClosedToOpen updates circuit breaker state machine from closed to open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool {
if b.state.cas(Closed, Open) {
b.updateNextRetryTimestamp()
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(Closed, *b.rule, snapshot)
}
stateChangedCounter.Add(float64(1), b.BoundRule().Resource, "Closed", "Open")
return true
}
return false
}
// fromOpenToHalfOpen updates circuit breaker state machine from open to half-open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromOpenToHalfOpen(ctx *base.EntryContext) bool {
if b.state.cas(Open, HalfOpen) {
for _, listener := range stateChangeListeners {
listener.OnTransformToHalfOpen(Open, *b.rule)
}
entry := ctx.Entry()
if entry == nil {
logging.Error(errors.New("nil entry"), "Nil entry in circuitBreakerBase.fromOpenToHalfOpen()", "rule", b.rule)
} else {
// add hook for entry exit
// if the current circuit breaker performs the probe through this entry, but the entry was blocked,
// this hook will guarantee current circuit breaker state machine will rollback to Open from Half-Open
entry.WhenExit(func(entry *base.SentinelEntry, ctx *base.EntryContext) error {
if ctx.IsBlocked() && b.state.cas(HalfOpen, Open) {
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(HalfOpen, *b.rule, 1.0)
}
}
return nil
})
}
stateChangedCounter.Add(float64(1), b.BoundRule().Resource, "Open", "HalfOpen")
return true
}
return false
}
// fromHalfOpenToOpen updates circuit breaker state machine from half-open to open.
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
if b.state.cas(HalfOpen, Open) {
b.resetCurProbeNum()
b.updateNextRetryTimestamp()
for _, listener := range stateChangeListeners {
listener.OnTransformToOpen(HalfOpen, *b.rule, snapshot)
}
stateChangedCounter.Add(float64(1), b.BoundRule().Resource, "HalfOpen", "Open")
return true
}
return false
}
// fromHalfOpenToClosed updates circuit breaker state machine from half-open to closed
// Return true only if current goroutine successfully accomplished the transformation.
func (b *circuitBreakerBase) fromHalfOpenToClosed() bool {
if b.state.cas(HalfOpen, Closed) {
b.resetCurProbeNum()
for _, listener := range stateChangeListeners {
listener.OnTransformToClosed(HalfOpen, *b.rule)
}
stateChangedCounter.Add(float64(1), b.BoundRule().Resource, "HalfOpen", "Closed")
return true
}
return false
}
// ================================= slowRtCircuitBreaker ====================================
type slowRtCircuitBreaker struct {
circuitBreakerBase
stat *slowRequestLeapArray
maxAllowedRt uint64
maxSlowRequestRatio float64
minRequestAmount uint64
}
func newSlowRtCircuitBreakerWithStat(r *Rule, stat *slowRequestLeapArray) *slowRtCircuitBreaker {
return &slowRtCircuitBreaker{
circuitBreakerBase: circuitBreakerBase{
rule: r,
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
stat: stat,
maxAllowedRt: r.MaxAllowedRtMs,
maxSlowRequestRatio: r.Threshold,
minRequestAmount: r.MinRequestAmount,
}
}
func newSlowRtCircuitBreaker(r *Rule) (*slowRtCircuitBreaker, error) {
interval := r.StatIntervalMs
bucketCount := getRuleStatSlidingWindowBucketCount(r)
stat := &slowRequestLeapArray{}
leapArray, err := sbase.NewLeapArray(bucketCount, interval, stat)
if err != nil {
return nil, err
}
stat.data = leapArray
return newSlowRtCircuitBreakerWithStat(r, stat), nil
}
func (b *slowRtCircuitBreaker) BoundStat() interface{} {
return b.stat
}
// TryPass checks circuit breaker based on state machine of circuit breaker.
func (b *slowRtCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
curStatus := b.CurrentState()
if curStatus == Closed {
return true
} else if curStatus == Open {
// switch state to half-open to probe if retry timeout
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, _ error) {
// add slow and add total
metricStat := b.stat
counter, curErr := metricStat.currentCounter()
if curErr != nil {
logging.Error(curErr, "Fail to get current counter in slowRtCircuitBreaker#OnRequestComplete().",
"rule", b.rule)
return
}
if rt > b.maxAllowedRt {
atomic.AddUint64(&counter.slowCount, 1)
}
atomic.AddUint64(&counter.totalCount, 1)
slowCount := uint64(0)
totalCount := uint64(0)
counters := metricStat.allCounter()
for _, c := range counters {
slowCount += atomic.LoadUint64(&c.slowCount)
totalCount += atomic.LoadUint64(&c.totalCount)
}
slowRatio := float64(slowCount) / float64(totalCount)
// handleStateChange
curStatus := b.CurrentState()
if curStatus == Open {
return
} else if curStatus == HalfOpen {
if rt > b.maxAllowedRt {
// fail to probe
b.fromHalfOpenToOpen(1.0)
} else {
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
// succeed to probe
b.fromHalfOpenToClosed()
b.resetMetric()
}
}
return
}
// current state is CLOSED
if totalCount < b.minRequestAmount {
return
}
if slowRatio > b.maxSlowRequestRatio || util.Float64Equals(slowRatio, b.maxSlowRequestRatio) {
curStatus = b.CurrentState()
switch curStatus {
case Closed:
b.fromClosedToOpen(slowRatio)
case HalfOpen:
b.fromHalfOpenToOpen(slowRatio)
default:
}
}
return
}
func (b *slowRtCircuitBreaker) resetMetric() {
for _, c := range b.stat.allCounter() {
c.reset()
}
}
type slowRequestCounter struct {
slowCount uint64
totalCount uint64
}
func (c *slowRequestCounter) reset() {
atomic.StoreUint64(&c.slowCount, 0)
atomic.StoreUint64(&c.totalCount, 0)
}
type slowRequestLeapArray struct {
data *sbase.LeapArray
}
func (s *slowRequestLeapArray) NewEmptyBucket() interface{} {
return &slowRequestCounter{
slowCount: 0,
totalCount: 0,
}
}
func (s *slowRequestLeapArray) ResetBucketTo(bw *sbase.BucketWrap, startTime uint64) *sbase.BucketWrap {
atomic.StoreUint64(&bw.BucketStart, startTime)
bw.Value.Store(&slowRequestCounter{
slowCount: 0,
totalCount: 0,
})
return bw
}
func (s *slowRequestLeapArray) currentCounter() (*slowRequestCounter, error) {
curBucket, err := s.data.CurrentBucket(s)
if err != nil {
return nil, err
}
if curBucket == nil {
return nil, errors.New("nil BucketWrap")
}
mb := curBucket.Value.Load()
if mb == nil {
return nil, errors.New("nil slowRequestCounter")
}
counter, ok := mb.(*slowRequestCounter)
if !ok {
return nil, errors.Errorf("bucket fail to do type assert, expect: *slowRequestCounter, in fact: %s", reflect.TypeOf(mb).Name())
}
return counter, nil
}
func (s *slowRequestLeapArray) allCounter() []*slowRequestCounter {
buckets := s.data.Values()
ret := make([]*slowRequestCounter, 0, len(buckets))
for _, b := range buckets {
mb := b.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket atomic Value is nil"), "Current bucket atomic Value is nil in slowRequestLeapArray.allCounter()")
continue
}
counter, ok := mb.(*slowRequestCounter)
if !ok {
logging.Error(errors.New("bucket data type error"), "Bucket data type error in slowRequestLeapArray.allCounter()", "expect type", "*slowRequestCounter", "actual type", reflect.TypeOf(mb).Name())
continue
}
ret = append(ret, counter)
}
return ret
}
// ================================= errorRatioCircuitBreaker ====================================
type errorRatioCircuitBreaker struct {
circuitBreakerBase
minRequestAmount uint64
errorRatioThreshold float64
stat *errorCounterLeapArray
}
func newErrorRatioCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *errorRatioCircuitBreaker {
return &errorRatioCircuitBreaker{
circuitBreakerBase: circuitBreakerBase{
rule: r,
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
minRequestAmount: r.MinRequestAmount,
errorRatioThreshold: r.Threshold,
stat: stat,
}
}
func newErrorRatioCircuitBreaker(r *Rule) (*errorRatioCircuitBreaker, error) {
interval := r.StatIntervalMs
bucketCount := getRuleStatSlidingWindowBucketCount(r)
stat := &errorCounterLeapArray{}
leapArray, err := sbase.NewLeapArray(bucketCount, interval, stat)
if err != nil {
return nil, err
}
stat.data = leapArray
return newErrorRatioCircuitBreakerWithStat(r, stat), nil
}
func (b *errorRatioCircuitBreaker) BoundStat() interface{} {
return b.stat
}
func (b *errorRatioCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
curStatus := b.CurrentState()
if curStatus == Closed {
return true
} else if curStatus == Open {
// switch state to half-open to probe if retry timeout
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
func (b *errorRatioCircuitBreaker) OnRequestComplete(_ uint64, err error) {
metricStat := b.stat
counter, curErr := metricStat.currentCounter()
if curErr != nil {
logging.Error(curErr, "Fail to get current counter in errorRatioCircuitBreaker#OnRequestComplete().",
"rule", b.rule)
return
}
if err != nil {
atomic.AddUint64(&counter.errorCount, 1)
}
atomic.AddUint64(&counter.totalCount, 1)
errorCount := uint64(0)
totalCount := uint64(0)
counters := metricStat.allCounter()
for _, c := range counters {
errorCount += atomic.LoadUint64(&c.errorCount)
totalCount += atomic.LoadUint64(&c.totalCount)
}
errorRatio := float64(errorCount) / float64(totalCount)
// handleStateChangeWhenThresholdExceeded
curStatus := b.CurrentState()
if curStatus == Open {
return
}
if curStatus == HalfOpen {
if err == nil {
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
b.fromHalfOpenToClosed()
b.resetMetric()
}
} else {
b.fromHalfOpenToOpen(1.0)
}
return
}
// current state is CLOSED
if totalCount < b.minRequestAmount {
return
}
if errorRatio > b.errorRatioThreshold || util.Float64Equals(errorRatio, b.errorRatioThreshold) {
curStatus = b.CurrentState()
switch curStatus {
case Closed:
b.fromClosedToOpen(errorRatio)
case HalfOpen:
b.fromHalfOpenToOpen(errorRatio)
default:
}
}
}
func (b *errorRatioCircuitBreaker) resetMetric() {
for _, c := range b.stat.allCounter() {
c.reset()
}
}
type errorCounter struct {
errorCount uint64
totalCount uint64
}
func (c *errorCounter) reset() {
atomic.StoreUint64(&c.errorCount, 0)
atomic.StoreUint64(&c.totalCount, 0)
}
type errorCounterLeapArray struct {
data *sbase.LeapArray
}
func (s *errorCounterLeapArray) NewEmptyBucket() interface{} {
return &errorCounter{
errorCount: 0,
totalCount: 0,
}
}
func (s *errorCounterLeapArray) ResetBucketTo(bw *sbase.BucketWrap, startTime uint64) *sbase.BucketWrap {
atomic.StoreUint64(&bw.BucketStart, startTime)
bw.Value.Store(&errorCounter{
errorCount: 0,
totalCount: 0,
})
return bw
}
func (s *errorCounterLeapArray) currentCounter() (*errorCounter, error) {
curBucket, err := s.data.CurrentBucket(s)
if err != nil {
return nil, err
}
if curBucket == nil {
return nil, errors.New("nil BucketWrap")
}
mb := curBucket.Value.Load()
if mb == nil {
return nil, errors.New("nil errorCounter")
}
counter, ok := mb.(*errorCounter)
if !ok {
return nil, errors.Errorf("bucket fail to do type assert, expect: *errorCounter, in fact: %s", reflect.TypeOf(mb).Name())
}
return counter, nil
}
func (s *errorCounterLeapArray) allCounter() []*errorCounter {
buckets := s.data.Values()
ret := make([]*errorCounter, 0, len(buckets))
for _, b := range buckets {
mb := b.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket atomic Value is nil"), "Current bucket atomic Value is nil in errorCounterLeapArray.allCounter()")
continue
}
counter, ok := mb.(*errorCounter)
if !ok {
logging.Error(errors.New("bucket data type error"), "Bucket data type error in errorCounterLeapArray.allCounter()", "expect type", "*errorCounter", "actual type", reflect.TypeOf(mb).Name())
continue
}
ret = append(ret, counter)
}
return ret
}
// ================================= errorCountCircuitBreaker ====================================
type errorCountCircuitBreaker struct {
circuitBreakerBase
minRequestAmount uint64
errorCountThreshold uint64
stat *errorCounterLeapArray
}
func newErrorCountCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *errorCountCircuitBreaker {
return &errorCountCircuitBreaker{
circuitBreakerBase: circuitBreakerBase{
rule: r,
retryTimeoutMs: r.RetryTimeoutMs,
nextRetryTimestampMs: 0,
state: newState(),
probeNumber: r.ProbeNum,
},
minRequestAmount: r.MinRequestAmount,
errorCountThreshold: uint64(r.Threshold),
stat: stat,
}
}
func newErrorCountCircuitBreaker(r *Rule) (*errorCountCircuitBreaker, error) {
interval := r.StatIntervalMs
bucketCount := getRuleStatSlidingWindowBucketCount(r)
stat := &errorCounterLeapArray{}
leapArray, err := sbase.NewLeapArray(bucketCount, interval, stat)
if err != nil {
return nil, err
}
stat.data = leapArray
return newErrorCountCircuitBreakerWithStat(r, stat), nil
}
func (b *errorCountCircuitBreaker) BoundStat() interface{} {
return b.stat
}
func (b *errorCountCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
curStatus := b.CurrentState()
if curStatus == Closed {
return true
} else if curStatus == Open {
// switch state to half-open to probe if retry timeout
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
return true
}
} else if curStatus == HalfOpen && b.probeNumber > 0 {
return true
}
return false
}
func (b *errorCountCircuitBreaker) OnRequestComplete(_ uint64, err error) {
metricStat := b.stat
counter, curErr := metricStat.currentCounter()
if curErr != nil {
logging.Error(curErr, "Fail to get current counter in errorCountCircuitBreaker#OnRequestComplete().",
"rule", b.rule)
return
}
if err != nil {
atomic.AddUint64(&counter.errorCount, 1)
}
atomic.AddUint64(&counter.totalCount, 1)
errorCount := uint64(0)
totalCount := uint64(0)
counters := metricStat.allCounter()
for _, c := range counters {
errorCount += atomic.LoadUint64(&c.errorCount)
totalCount += atomic.LoadUint64(&c.totalCount)
}
// handleStateChangeWhenThresholdExceeded
curStatus := b.CurrentState()
if curStatus == Open {
return
}
if curStatus == HalfOpen {
if err == nil {
b.addCurProbeNum()
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
b.fromHalfOpenToClosed()
b.resetMetric()
}
} else {
b.fromHalfOpenToOpen(1)
}
return
}
// current state is CLOSED
if totalCount < b.minRequestAmount {
return
}
if errorCount >= b.errorCountThreshold {
curStatus = b.CurrentState()
switch curStatus {
case Closed:
b.fromClosedToOpen(errorCount)
case HalfOpen:
b.fromHalfOpenToOpen(errorCount)
default:
}
}
}
func (b *errorCountCircuitBreaker) resetMetric() {
for _, c := range b.stat.allCounter() {
c.reset()
}
}