swim/state_transitions.go (150 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package swim
import (
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/uber-common/bark"
"github.com/uber/ringpop-go/logging"
"github.com/uber/ringpop-go/util"
)
// subject is an interface to define the subject (eg. member) to transition state for. This interface allows to pass in both a Member and a Change struct to the schedule function.
type subject interface {
address() string
incarnation() int64
}
type transitionTimer struct {
*clock.Timer
// state represents the state the subject was in when the transition was scheduled
state string
}
// stateTransitions handles the timers for state transitions in SWIM
type stateTransitions struct {
sync.Mutex
node *Node
timeouts StateTimeouts
timers map[string]*transitionTimer
enabled bool
logger bark.Logger
}
// StateTimeouts contains the configured timeouts for every state before transitioning to the new state
type StateTimeouts struct {
// Suspect is the timeout it takes a node in suspect mode to transition to faulty
Suspect time.Duration
// Faulty is the timeout it takes a node in faulty mode to transition to tombstone
Faulty time.Duration
// Tombstone is the timeout it takes a node in tombstone mode to be evicted
Tombstone time.Duration
}
func mergeStateTimeouts(one StateTimeouts, two StateTimeouts) StateTimeouts {
return StateTimeouts{
Suspect: util.SelectDuration(one.Suspect, two.Suspect),
Faulty: util.SelectDuration(one.Faulty, two.Faulty),
Tombstone: util.SelectDuration(one.Tombstone, two.Tombstone),
}
}
// newStateTransitions returns a new state transition controller that can be used to schedule state transitions for nodes
func newStateTransitions(n *Node, timeouts StateTimeouts) *stateTransitions {
return &stateTransitions{
node: n,
timeouts: timeouts,
timers: make(map[string]*transitionTimer),
enabled: true,
logger: logging.Logger("stateTransitions").WithField("local", n.Address()),
}
}
// ScheduleSuspectToFaulty starts the suspect timer. After the Suspect timeout the node will be declared faulty
func (s *stateTransitions) ScheduleSuspectToFaulty(subject subject) {
s.Lock()
s.schedule(subject, Suspect, s.timeouts.Suspect, func() {
// transition the subject to faulty
s.node.memberlist.MakeFaulty(subject.address(), subject.incarnation())
})
s.Unlock()
}
// ScheduleFaultyToTombstone starts the faulty timer. After the Faulty timeout the node will be declared tombstone
func (s *stateTransitions) ScheduleFaultyToTombstone(subject subject) {
s.Lock()
s.schedule(subject, Faulty, s.timeouts.Faulty, func() {
// transition the subject to tombstone
s.node.memberlist.MakeTombstone(subject.address(), subject.incarnation())
})
s.Unlock()
}
// ScheduleTombstoneToEvict starts the tombstone timer. After the Faulty timeout the node will be evicted
func (s *stateTransitions) ScheduleTombstoneToEvict(subject subject) {
s.Lock()
s.schedule(subject, Tombstone, s.timeouts.Tombstone, func() {
// transition the subject to tombstone
s.node.memberlist.Evict(subject.address())
})
s.Unlock()
}
func (s *stateTransitions) schedule(subject subject, state string, timeout time.Duration, transition func()) {
if !s.enabled {
s.logger.WithField("member", subject.address()).Warn("cannot schedule a state transition while disabled")
return
}
if s.node.Address() == subject.address() {
s.logger.WithField("member", subject.address()).Debug("cannot schedule a state transition for the local member")
return
}
if timer, ok := s.timers[subject.address()]; ok {
if timer.state == state {
s.logger.WithFields(bark.Fields{
"member": subject.address(),
"state": state,
}).Warn("redundant call to schedule a state transition for member, ignored")
return
}
// cancel the previously scheduled transition for the subject
timer.Stop()
}
timer := s.node.clock.AfterFunc(timeout, func() {
s.logger.WithFields(bark.Fields{
"member": subject.address(),
"state": state,
}).Info("executing scheduled transition for member")
// execute the transition
transition()
})
s.timers[subject.address()] = &transitionTimer{
Timer: timer,
state: state,
}
s.logger.WithFields(bark.Fields{
"member": subject.address(),
"state": state,
}).Debug("scheduled state transition for member")
}
// Cancel cancels the scheduled transition for the subject
func (s *stateTransitions) Cancel(subject subject) {
s.Lock()
if timer, ok := s.timers[subject.address()]; ok {
timer.Stop()
delete(s.timers, subject.address())
s.logger.WithFields(bark.Fields{
"member": subject.address(),
"state": timer.state,
}).Debug("stopped scheduled state transition for member")
}
s.Unlock()
}
// Enable enables state transition controller. The transition controller needs to be in enabled state to allow transitions to be scheduled.
func (s *stateTransitions) Enable() {
s.Lock()
if s.enabled {
s.logger.Warn("state transition controller already enabled")
s.Unlock()
return
}
s.enabled = true
s.Unlock()
s.logger.Info("enabled state transition controller")
}
// Disable cancels all scheduled state transitions and disables the state transition controller for further use
func (s *stateTransitions) Disable() {
s.Lock()
if !s.enabled {
s.logger.Warn("state transition controller already disabled")
s.Unlock()
return
}
s.enabled = false
numTimers := len(s.timers)
for address, timer := range s.timers {
timer.Stop()
delete(s.timers, address)
}
s.Unlock()
s.logger.WithField("timersStopped", numTimers).Info("disabled state transition controller")
}
// timer is a testing func to avoid data races
func (s *stateTransitions) timer(address string) *clock.Timer {
s.Lock()
t, ok := s.timers[address]
s.Unlock()
if !ok {
return nil
}
return t.Timer
}