swim/heal_via_discover_provider.go (120 lines of code) (raw):
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package swim
import (
"errors"
"math/rand"
"time"
log "github.com/uber-common/bark"
"github.com/uber/ringpop-go/logging"
"github.com/uber/ringpop-go/util"
)
// discoverProviderHealer attempts to heal a ringpop partition by consulting
// the discoverProvider for nodes that are faulty or not available in its
// membership. It attempts the heal probabalisticly so that the discovery provider
// doesn't get overloaded with request -- with default settings 6
// times per minutes on avarage for the entire cluster.
type discoverProviderHealer struct {
node *Node
baseProbabillity float64
period time.Duration
previousHostListSize int
quit chan struct{}
started chan struct{}
logger log.Logger
rand *rand.Rand
}
func newDiscoverProviderHealer(n *Node, baseProbability float64, period time.Duration) *discoverProviderHealer {
return &discoverProviderHealer{
node: n,
baseProbabillity: baseProbability,
period: period,
logger: logging.Logger("healer").WithField("local", n.Address()),
started: make(chan struct{}, 1),
quit: make(chan struct{}),
rand: rand.New(rand.NewSource(n.clock.Now().UnixNano())),
}
}
// Start the partition healing loop
func (h *discoverProviderHealer) Start() {
// check if started channel is already filled
// if not, we start a new loop
select {
case h.started <- struct{}{}:
default:
return
}
go func() {
for {
// loop or quit
select {
case <-h.node.clock.After(h.period):
case <-h.quit:
return
}
// attempt heal with the pro
if h.rand.Float64() < h.Probability() {
h.Heal()
}
}
}()
}
// Stop the partition healing loop.
func (h *discoverProviderHealer) Stop() {
// if started, consume and send quit signal
// if not started this is noop
select {
case <-h.started:
h.quit <- struct{}{}
default:
}
}
// Probability returns the probability when a heal should be attempted
// we want to throttle the heal attempts to alleviate pressure on the
// discover provider.
func (h *discoverProviderHealer) Probability() float64 {
// avoid division by zero.
if h.previousHostListSize < h.node.CountReachableMembers() {
h.previousHostListSize = h.node.CountReachableMembers()
}
if h.previousHostListSize < 1 {
h.previousHostListSize = 1
}
return h.baseProbabillity / float64(h.previousHostListSize)
}
// Heal iterates over the hostList that the discoverProvider provides. If the
// node encounters a host that is faulty or not in the membership, we pick that
// node as a target to perform a partition heal with.
//
// If heal was attempted, returns identities of the target nodes.
func (h *discoverProviderHealer) Heal() ([]string, error) {
h.node.EmitEvent(DiscoHealEvent{})
// get list from discovery provider
if h.node.discoverProvider == nil {
return []string{}, errors.New("discoverProvider not available to healer")
}
hostList, err := h.node.discoverProvider.Hosts()
if err != nil {
h.logger.Warn("healer unable to receive host list from discover provider")
return []string{}, err
}
h.previousHostListSize = len(hostList)
// collect the targets this node might want to heal with
var targets []string
for _, address := range hostList {
m, ok := h.node.memberlist.Member(address)
if !ok || statePrecedence(m.Status) >= statePrecedence(Faulty) {
targets = append(targets, address)
}
}
util.ShuffleStringsInPlace(targets)
// filter hosts that we already know about and attempt to heal nodes that
// are complementary to the membership of this node.
var ret []string
failures := 0
maxFailures := 10
for len(targets) != 0 && failures < maxFailures {
target := targets[0]
targets = del(targets, target)
// try to heal partition
hostsOnOtherSide, err := AttemptHeal(h.node, target)
if err != nil {
h.logger.WithFields(log.Fields{
"error": err.Error(),
"failure": failures,
}).Warn("heal attempt failed (10 in total)")
failures++
continue
}
for _, host := range hostsOnOtherSide {
targets = del(targets, host)
}
ret = append(ret, target)
}
if failures == maxFailures {
h.logger.WithField("reachedNodes", len(ret)).Warn("healer reached max failures")
}
return ret, nil
}
// del returns a slice where all ocurences of s are filtered out. This modifies
// the original slice.
func del(strs []string, s string) []string {
for i := 0; i < len(strs); i++ {
if strs[i] != s {
continue
}
strs[i] = strs[len(strs)-1]
strs = strs[:len(strs)-1]
i--
}
return strs
}