core/base/slot_chain.go (143 lines of code) (raw):

// Copyright 1999-2020 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package base import ( "sort" "sync" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" ) type BaseSlot interface { // Order returns the sort value of the slot. // SlotChain will sort all it's slots by ascending sort value in each bucket // (StatPrepareSlot bucket、RuleCheckSlot bucket and StatSlot bucket) Order() uint32 } // StatPrepareSlot is responsible for some preparation before statistic // For example: init structure and so on type StatPrepareSlot interface { BaseSlot // Prepare function do some initialization // Such as: init statistic structure、node and etc // The result of preparing would store in EntryContext // All StatPrepareSlots execute in sequence // Prepare function should not throw panic. Prepare(ctx *EntryContext) } // RuleCheckSlot is rule based checking strategy // All checking rule must implement this interface. type RuleCheckSlot interface { BaseSlot // Check function do some validation // It can break off the slot pipeline // Each TokenResult will return check result // The upper logic will control pipeline according to SlotResult. Check(ctx *EntryContext) *TokenResult } // StatSlot is responsible for counting all custom biz metrics. // StatSlot would not handle any panic, and pass up all panic to slot chain type StatSlot interface { BaseSlot // OnEntryPass function will be invoked when StatPrepareSlots and RuleCheckSlots execute pass // StatSlots will do some statistic logic, such as QPS、log、etc OnEntryPassed(ctx *EntryContext) // OnEntryBlocked function will be invoked when StatPrepareSlots and RuleCheckSlots fail to execute // It may be inbound flow control or outbound cir // StatSlots will do some statistic logic, such as QPS、log、etc // blockError introduce the block detail OnEntryBlocked(ctx *EntryContext, blockError *BlockError) // OnCompleted function will be invoked when chain exits. // The semantics of OnCompleted is the entry passed and completed // Note: blocked entry will not call this function OnCompleted(ctx *EntryContext) } // SlotChain hold all system slots and customized slot. // SlotChain support plug-in slots developed by developer. type SlotChain struct { // statPres is in ascending order by StatPrepareSlot.Order() value. statPres []StatPrepareSlot // ruleChecks is in ascending order by RuleCheckSlot.Order() value. ruleChecks []RuleCheckSlot // stats is in ascending order by StatSlot.Order() value. stats []StatSlot // EntryContext Pool, used for reuse EntryContext object ctxPool *sync.Pool } var ( ctxPool = &sync.Pool{ New: func() interface{} { ctx := NewEmptyEntryContext() ctx.RuleCheckResult = NewTokenResultPass() ctx.Data = make(map[interface{}]interface{}) ctx.Input = &SentinelInput{ BatchCount: 1, Flag: 0, Args: make([]interface{}, 0), Attachments: make(map[interface{}]interface{}), } return ctx }, } ) func NewSlotChain() *SlotChain { return &SlotChain{ statPres: make([]StatPrepareSlot, 0, 8), ruleChecks: make([]RuleCheckSlot, 0, 8), stats: make([]StatSlot, 0, 8), ctxPool: ctxPool, } } // Get an EntryContext from EntryContext ctxPool, if ctxPool doesn't have enough EntryContext then new one. func (sc *SlotChain) GetPooledContext() *EntryContext { ctx := sc.ctxPool.Get().(*EntryContext) ctx.startTime = util.CurrentTimeMillis() return ctx } func (sc *SlotChain) RefurbishContext(c *EntryContext) { if c != nil { c.Reset() sc.ctxPool.Put(c) } } // AddStatPrepareSlot adds the StatPrepareSlot slot to the StatPrepareSlot list of the SlotChain. // All StatPrepareSlot in the list will be sorted according to StatPrepareSlot.Order() in ascending order. // AddStatPrepareSlot is non-thread safe, // In concurrency scenario, AddStatPrepareSlot must be guarded by SlotChain.RWMutex#Lock func (sc *SlotChain) AddStatPrepareSlot(s StatPrepareSlot) { sc.statPres = append(sc.statPres, s) sort.SliceStable(sc.statPres, func(i, j int) bool { return sc.statPres[i].Order() < sc.statPres[j].Order() }) } // AddRuleCheckSlot adds the RuleCheckSlot to the RuleCheckSlot list of the SlotChain. // All RuleCheckSlot in the list will be sorted according to RuleCheckSlot.Order() in ascending order. // AddRuleCheckSlot is non-thread safe, // In concurrency scenario, AddRuleCheckSlot must be guarded by SlotChain.RWMutex#Lock func (sc *SlotChain) AddRuleCheckSlot(s RuleCheckSlot) { sc.ruleChecks = append(sc.ruleChecks, s) sort.SliceStable(sc.ruleChecks, func(i, j int) bool { return sc.ruleChecks[i].Order() < sc.ruleChecks[j].Order() }) } // AddStatSlot adds the StatSlot to the StatSlot list of the SlotChain. // All StatSlot in the list will be sorted according to StatSlot.Order() in ascending order. // AddStatSlot is non-thread safe, // In concurrency scenario, AddStatSlot must be guarded by SlotChain.RWMutex#Lock func (sc *SlotChain) AddStatSlot(s StatSlot) { sc.stats = append(sc.stats, s) sort.SliceStable(sc.stats, func(i, j int) bool { return sc.stats[i].Order() < sc.stats[j].Order() }) } // The entrance of slot chain // Return the TokenResult and nil if internal panic. func (sc *SlotChain) Entry(ctx *EntryContext) *TokenResult { // This should not happen, unless there are errors existing in Sentinel internal. // If happened, need to add TokenResult in EntryContext defer func() { if err := recover(); err != nil { logging.Error(errors.Errorf("%+v", err), "Sentinel internal panic in SlotChain.Entry()") ctx.SetError(errors.Errorf("%+v", err)) return } }() // execute prepare slot sps := sc.statPres if len(sps) > 0 { for _, s := range sps { s.Prepare(ctx) } } // execute rule based checking slot rcs := sc.ruleChecks var ruleCheckRet *TokenResult if len(rcs) > 0 { for _, s := range rcs { sr := s.Check(ctx) if sr == nil { // nil equals to check pass continue } // check slot result if sr.IsBlocked() { ruleCheckRet = sr break } } } if ruleCheckRet == nil { ctx.RuleCheckResult.ResetToPass() } else { ctx.RuleCheckResult = ruleCheckRet } // execute statistic slot ss := sc.stats ruleCheckRet = ctx.RuleCheckResult if len(ss) > 0 { for _, s := range ss { // indicate the result of rule based checking slot. if !ruleCheckRet.IsBlocked() { s.OnEntryPassed(ctx) } else { // The block error should not be nil. s.OnEntryBlocked(ctx, ruleCheckRet.blockErr) } } } return ruleCheckRet } func (sc *SlotChain) exit(ctx *EntryContext) { if ctx == nil || ctx.Entry() == nil { logging.Error(errors.New("entryContext or SentinelEntry is nil"), "EntryContext or SentinelEntry is nil in SlotChain.exit()", "ctx", ctx) return } // The OnCompleted is called only when entry passed if ctx.IsBlocked() { return } for _, s := range sc.stats { s.OnCompleted(ctx) } // relieve the context here }