core/outlier/retryer.go (115 lines of code) (raw):
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package outlier
import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/alibaba/sentinel-golang/logging"
)
var (
// resource name ---> node retryer
retryers = make(map[string]*Retryer)
retryerMutex = new(sync.Mutex)
retryerCh = make(chan task, capacity)
)
func init() {
go func() {
defer func() {
if err := recover(); err != nil {
logging.Error(fmt.Errorf("%+v", err), "Unexpected panic when consuming retryerCh")
}
}()
for task := range retryerCh {
retryer := getRetryerOfResource(task.resource)
retryer.scheduleNodes(task.nodes)
}
}()
}
// Each service should have its own Retryer to proactively retry in case of node failure.
type Retryer struct {
resource string
interval time.Duration // initial value of the retry interval
maxAttempts uint32
counts map[string]uint32 // ip address ---> retried count
checkFunc RecoveryCheckFunc
mtx sync.Mutex
}
func getRetryerOfResource(resource string) *Retryer {
retryerMutex.Lock()
defer retryerMutex.Unlock()
if _, ok := retryers[resource]; !ok {
retryer := &Retryer{
resource: resource,
counts: make(map[string]uint32),
}
rule := getOutlierRuleOfResource(resource)
if rule == nil {
logging.Error(errors.New("nil outlier rule"), "Nil outlier rule in getRetryerOfResource()")
} else {
retryer.maxAttempts = rule.MaxRecoveryAttempts
retryer.interval = time.Duration(rule.RecoveryIntervalMs * 1e6)
if rule.RecoveryCheckFunc != nil {
retryer.checkFunc = rule.RecoveryCheckFunc
} else {
retryer.checkFunc = isPortOpen
}
}
retryers[resource] = retryer
}
return retryers[resource]
}
func isPortOpen(address string) bool {
conn, err := net.DialTimeout("tcp", address, 5*time.Second)
if err == nil {
conn.Close()
return true
}
return false
}
func (r *Retryer) scheduleNodes(nodes []string) {
r.mtx.Lock()
defer r.mtx.Unlock()
for _, node := range nodes {
if _, ok := r.counts[node]; !ok {
r.counts[node] = 1
logging.Info("[Outlier Retryer] Reconnecting...", "node", node)
nodeCopy := node // Copy values to correctly capture the closure for node.
time.AfterFunc(r.interval, func() {
r.connectNode(nodeCopy)
})
}
}
}
func (r *Retryer) connectNode(node string) {
start := time.Now()
if r.checkFunc(node) {
end := time.Now()
r.onConnected(node, uint64(end.Sub(start).Milliseconds()))
} else {
r.onDisconnected(node)
}
}
func (r *Retryer) onConnected(node string, rt uint64) {
r.mtx.Lock()
delete(r.counts, node)
r.mtx.Unlock()
recycler := getRecyclerOfResource(r.resource)
recycler.recover(node)
breakers := getNodeBreakersOfResource(r.resource)
if breaker, ok := breakers[node]; ok {
breaker.OnRequestComplete(rt, nil)
} else {
logging.Warn("[Outlier Retryer] Failed to update status after reconnection", "node", node)
}
}
func (r *Retryer) onDisconnected(node string) {
r.mtx.Lock()
r.counts[node]++
count := r.counts[node]
if count > r.maxAttempts {
count = r.maxAttempts
}
r.mtx.Unlock()
// Fix bugs: When multiple active checks still do not recover, it is necessary to delete node from r.counts.
time.AfterFunc(r.interval*time.Duration(count), func() {
r.connectNode(node)
})
}