swim/node.go (354 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package swim
import (
"errors"
"sync"
"time"
"github.com/uber/ringpop-go/membership"
"github.com/uber/ringpop-go/discovery"
"github.com/uber/ringpop-go/events"
"github.com/benbjohnson/clock"
"github.com/rcrowley/go-metrics"
log "github.com/uber-common/bark"
"github.com/uber/ringpop-go/logging"
"github.com/uber/ringpop-go/shared"
"github.com/uber/ringpop-go/util"
)
var (
// ErrNodeNotReady is returned when a remote request is being handled while the node is not yet ready
ErrNodeNotReady = errors.New("node is not ready to handle requests")
)
// Options is a configuration struct passed the NewNode constructor.
type Options struct {
StateTimeouts StateTimeouts
MinProtocolPeriod time.Duration
JoinTimeout, PingTimeout, PingRequestTimeout time.Duration
PingRequestSize int
RollupFlushInterval time.Duration
RollupMaxUpdates int
MaxReverseFullSyncJobs int
// When started, the partition healing algorithm attempts a partition heal
// every PartitionHealPeriod with a probability of:
// PartitionHealBaseProbabillity / # Nodes in discoverProvider.
//
// When in a 100 node cluster BaseProbabillity = 3 and Period = 30s,
// every 30 seconds a node will have a probability of 3/100 to start the
// partition healing procedure. This means that for the entire cluster
// the discover provider receives 6 calls per minute on average.
PartitionHealPeriod time.Duration
PartitionHealBaseProbabillity float64
LabelLimits LabelOptions
InitialLabels LabelMap
Clock clock.Clock
SelfEvict SelfEvictOptions
// If set to true, ping requests without app name return error
RequiresAppInPing bool
}
func defaultOptions() *Options {
opts := &Options{
StateTimeouts: StateTimeouts{
Suspect: 5 * time.Second,
Faulty: 24 * time.Hour,
Tombstone: 1 * time.Minute,
},
MinProtocolPeriod: 200 * time.Millisecond,
JoinTimeout: 1000 * time.Millisecond,
PingTimeout: 1500 * time.Millisecond,
PingRequestTimeout: 5000 * time.Millisecond,
PingRequestSize: 3,
RollupFlushInterval: 5000 * time.Millisecond,
RollupMaxUpdates: 250,
PartitionHealPeriod: 30 * time.Second,
PartitionHealBaseProbabillity: 3,
LabelLimits: DefaultLabelOptions,
Clock: clock.New(),
MaxReverseFullSyncJobs: 5,
SelfEvict: SelfEvictOptions{
PingRatio: .4,
},
}
return opts
}
func mergeDefaultOptions(opts *Options) *Options {
def := defaultOptions()
if opts == nil {
return def
}
opts.StateTimeouts = mergeStateTimeouts(opts.StateTimeouts, def.StateTimeouts)
opts.LabelLimits = mergeLabelOptions(opts.LabelLimits, def.LabelLimits)
opts.MinProtocolPeriod = util.SelectDuration(opts.MinProtocolPeriod, def.MinProtocolPeriod)
opts.RollupMaxUpdates = util.SelectInt(opts.RollupMaxUpdates, def.RollupMaxUpdates)
opts.RollupFlushInterval = util.SelectDuration(opts.RollupFlushInterval, def.RollupFlushInterval)
opts.PartitionHealPeriod = util.SelectDuration(opts.PartitionHealPeriod, def.PartitionHealPeriod)
opts.PartitionHealBaseProbabillity = util.SelectFloat(opts.PartitionHealBaseProbabillity, def.PartitionHealBaseProbabillity)
opts.JoinTimeout = util.SelectDuration(opts.JoinTimeout, def.JoinTimeout)
opts.PingTimeout = util.SelectDuration(opts.PingTimeout, def.PingTimeout)
opts.PingRequestTimeout = util.SelectDuration(opts.PingRequestTimeout, def.PingRequestTimeout)
opts.PingRequestSize = util.SelectInt(opts.PingRequestSize, def.PingRequestSize)
opts.MaxReverseFullSyncJobs = util.SelectInt(opts.MaxReverseFullSyncJobs, def.MaxReverseFullSyncJobs)
opts.SelfEvict = mergeSelfEvictOptions(opts.SelfEvict, def.SelfEvict)
if opts.Clock == nil {
opts.Clock = def.Clock
}
return opts
}
// NodeInterface specifies the public-facing methods that a SWIM Node
// implements.
type NodeInterface interface {
Bootstrap(opts *BootstrapOptions) ([]string, error)
CountReachableMembers(predicates ...MemberPredicate) int
Destroy()
GetChecksum() uint32
GetReachableMembers(predicates ...MemberPredicate) []Member
Labels() *NodeLabels
MemberStats() MemberStats
ProtocolStats() ProtocolStats
Ready() bool
AddListener(events.EventListener) bool
RemoveListener(events.EventListener) bool
// swim.SelfEvict
// mockery has troubles generating a working mock when the interface is
// embedded therefore the definitions are copied here.
RegisterSelfEvictHook(hooks SelfEvictHook) error
SelfEvict() error
// SetIdentity changes the identity of the local node to a different
// identity
SetIdentity(identity string) error
}
// A Node is a SWIM member
type Node struct {
events.SyncEventEmitter
app string
service string
address string
state struct {
stopped, destroyed, pinging, ready bool
sync.RWMutex
}
channel shared.SubChannel
discoverProvider discovery.DiscoverProvider
memberlist *memberlist
memberiter memberIter
disseminator *disseminator
stateTransitions *stateTransitions
gossip *gossip
// When we get more healer strategies we can abstract to a healer interface.
healer *discoverProviderHealer
joinTimeout, pingTimeout, pingRequestTimeout time.Duration
pingRequestSize int
maxReverseFullSyncJobs int
clientRate metrics.Meter
serverRate metrics.Meter
totalRate metrics.Meter
startTime time.Time
logger log.Logger
labelLimits LabelOptions
// clock is used to generate incarnation numbers; it is typically the
// system clock, wrapped via clock.New()
clock clock.Clock
selfEvict *selfEvict
selfEvictOptions SelfEvictOptions
requiresAppInPing bool
}
// NewNode returns a new SWIM Node.
func NewNode(app, address string, channel shared.SubChannel, opts *Options) *Node {
// use defaults for options that are unspecified
opts = mergeDefaultOptions(opts)
node := &Node{
address: address,
app: app,
channel: channel,
logger: logging.Logger("node").WithField("local", address),
joinTimeout: opts.JoinTimeout,
pingTimeout: opts.PingTimeout,
pingRequestTimeout: opts.PingRequestTimeout,
pingRequestSize: opts.PingRequestSize,
maxReverseFullSyncJobs: opts.MaxReverseFullSyncJobs,
clientRate: metrics.NewMeter(),
serverRate: metrics.NewMeter(),
totalRate: metrics.NewMeter(),
clock: opts.Clock,
}
node.selfEvict = newSelfEvict(node, opts.SelfEvict)
node.requiresAppInPing = opts.RequiresAppInPing
node.labelLimits = opts.LabelLimits
node.memberlist = newMemberlist(node, opts.InitialLabels)
node.memberiter = newMemberlistIter(node.memberlist)
node.stateTransitions = newStateTransitions(node, opts.StateTimeouts)
node.healer = newDiscoverProviderHealer(
node,
opts.PartitionHealBaseProbabillity,
opts.PartitionHealPeriod,
)
node.gossip = newGossip(node, opts.MinProtocolPeriod)
node.disseminator = newDisseminator(node)
for _, member := range node.memberlist.GetMembers() {
change := Change{}
change.populateSubject(&member)
node.disseminator.RecordChange(change)
}
if node.channel != nil {
node.registerHandlers()
node.service = node.channel.ServiceName()
}
return node
}
// Address returns the address of the SWIM node.
func (n *Node) Address() string {
return n.address
}
// App returns the Node's application name.
func (n *Node) App() string {
return n.app
}
// HasChanges reports whether Node has changes to disseminate.
func (n *Node) HasChanges() bool {
return n.disseminator.HasChanges()
}
// Incarnation returns the incarnation number of the Node.
func (n *Node) Incarnation() int64 {
if n.memberlist != nil && n.memberlist.local != nil {
n.memberlist.members.RLock()
incarnation := n.memberlist.local.Incarnation
n.memberlist.members.RUnlock()
return incarnation
}
return -1
}
// Start starts the SWIM protocol and all sub-protocols.
func (n *Node) Start() {
n.gossip.Start()
n.stateTransitions.Enable()
n.healer.Start()
n.state.Lock()
n.state.stopped = false
n.state.Unlock()
}
// Stop stops the SWIM protocol and all sub-protocols.
func (n *Node) Stop() {
n.gossip.Stop()
n.stateTransitions.Disable()
n.healer.Stop()
n.state.Lock()
n.state.stopped = true
n.state.Unlock()
}
// Stopped returns whether or not the SWIM protocol is currently running.
func (n *Node) Stopped() bool {
n.state.RLock()
stopped := n.state.stopped
n.state.RUnlock()
return stopped
}
// Destroy stops the SWIM protocol and all sub-protocols.
func (n *Node) Destroy() {
n.state.Lock()
if n.state.destroyed {
n.state.Unlock()
return
}
n.state.destroyed = true
n.state.Unlock()
n.Stop()
}
// Destroyed returns whether or not the node has been destroyed.
func (n *Node) Destroyed() bool {
n.state.RLock()
destroyed := n.state.destroyed
n.state.RUnlock()
return destroyed
}
// Ready returns whether or not the node has bootstrapped and is ready for use.
func (n *Node) Ready() bool {
n.state.RLock()
ready := n.state.ready
n.state.RUnlock()
return ready
}
// RegisterSelfEvictHook registers systems that want to hook into the eviction
// sequence of the swim protocol.
func (n *Node) RegisterSelfEvictHook(hooks SelfEvictHook) error {
return n.selfEvict.RegisterSelfEvictHook(hooks)
}
// SelfEvict initiates the self eviction sequence of ringpop, it will mark the
// node as faulty and calls systems that want to hook in to the sequence at the
// corresponding times.
func (n *Node) SelfEvict() error {
return n.selfEvict.SelfEvict()
}
//= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
//
// Bootstrapping
//
//= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
// BootstrapOptions is a configuration struct passed to Node.Bootstrap.
type BootstrapOptions struct {
// The DiscoverProvider resolves a list of bootstrap hosts.
DiscoverProvider discovery.DiscoverProvider
// Whether or not gossip should be started immediately after a successful
// bootstrap.
Stopped bool
// Amount of time before individual join requests time out.
JoinTimeout time.Duration
// Minimum number of nodes to join to satisfy a bootstrap.
JoinSize int
// Maximum time to attempt joins before the entire bootstrap process times
// out.
MaxJoinDuration time.Duration
// A higher ParallelismFactor increases the number of nodes that a
// bootstrapping node will attempt to reach out to in order to satisfy
// `JoinSize` (the number of nodes that will be contacted at a time is
// `ParallelismFactor * JoinSize`).
ParallelismFactor int
}
// Bootstrap joins a node to a cluster. The channel provided to the node must be
// listening for the bootstrap to complete.
func (n *Node) Bootstrap(opts *BootstrapOptions) ([]string, error) {
if n.channel == nil {
return nil, errors.New("channel required")
}
if opts == nil {
opts = &BootstrapOptions{}
}
n.discoverProvider = opts.DiscoverProvider
joinOpts := &joinOpts{
timeout: opts.JoinTimeout,
size: opts.JoinSize,
maxJoinDuration: opts.MaxJoinDuration,
parallelismFactor: opts.ParallelismFactor,
}
joined, err := sendJoin(n, joinOpts)
if err != nil {
n.logger.WithFields(log.Fields{
"err": err.Error(),
}).Error("bootstrap failed")
return nil, err
}
if !opts.Stopped {
n.gossip.Start()
n.healer.Start()
}
n.state.Lock()
n.state.ready = true
n.state.Unlock()
n.startTime = time.Now()
return joined, nil
}
//= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
//
// Change Handling
//
//= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
func (n *Node) handleChanges(changes []Change) {
n.disseminator.AdjustMaxPropagations()
for _, change := range changes {
n.disseminator.RecordChange(change)
switch change.Status {
case Alive:
n.stateTransitions.Cancel(change)
case Suspect:
n.stateTransitions.ScheduleSuspectToFaulty(change)
case Faulty:
n.stateTransitions.ScheduleFaultyToTombstone(change)
case Leave:
// XXX: should this also reap?
n.stateTransitions.Cancel(change)
case Tombstone:
n.stateTransitions.ScheduleTombstoneToEvict(change)
}
}
}
//= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
//
// Gossip
//
//= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
func (n *Node) pinging() bool {
n.state.RLock()
pinging := n.state.pinging
n.state.RUnlock()
return pinging
}
func (n *Node) setPinging(pinging bool) {
n.state.Lock()
n.state.pinging = pinging
n.state.Unlock()
}
// pingNextMember pings the next member in the memberlist
func (n *Node) pingNextMember() {
member, ok := n.memberiter.Next()
if !ok {
n.logger.Warn("no pingable members")
return
}
if n.pinging() {
n.logger.Warn("node already pinging")
return
}
n.setPinging(true)
defer n.setPinging(false)
// send ping
res, err := sendPing(n, member.Address, n.pingTimeout)
if err == nil {
n.memberlist.Update(res.Changes)
return
}
// ping failed, send ping requests
target := member.Address
targetReached, errs := indirectPing(n, target, n.pingRequestSize, n.pingRequestTimeout)
// if all helper nodes are unreachable, the indirectPing is inconclusive
if len(errs) == n.pingRequestSize {
n.logger.WithFields(log.Fields{
"target": target,
"errors": errs,
"numErrors": len(errs),
}).Warn("ping request inconclusive due to errors")
return
}
if !targetReached {
n.logger.WithField("target", target).Info("ping request target unreachable")
n.memberlist.MakeSuspect(member.Address, member.Incarnation)
return
}
n.logger.WithField("target", target).Info("ping request target reachable")
}
// GetReachableMembers returns a slice of members containing only the reachable
// members that satisfies the predicates passed in.
func (n *Node) GetReachableMembers(predicates ...MemberPredicate) []Member {
predicates = append(predicates, memberIsReachable)
return n.memberlist.GetMembers(predicates...)
}
// CountReachableMembers returns the number of reachable members currently in
// this node's membership list that satisfies all predicates passed in.
func (n *Node) CountReachableMembers(predicates ...MemberPredicate) int {
predicates = append(predicates, memberIsReachable)
return n.memberlist.CountMembers(predicates...)
}
// Labels returns a mutator for the labels kept on this local node. This mutator
// interacts with the local node and memberlist to change labels on this node
// and gossip those changes around.
func (n *Node) Labels() *NodeLabels {
return &NodeLabels{n}
}
// SetIdentity changes the identity of the local node. This will change the
// state of the local node and will be gossiped around in the network.
func (n *Node) SetIdentity(identity string) error {
return n.memberlist.SetLocalLabel(membership.IdentityLabelKey, identity)
}