swim/self_evict.go (193 lines of code) (raw):

// Copyright (c) 2016 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package swim import ( "errors" "math" "sync" "sync/atomic" "time" "github.com/uber-common/bark" "github.com/uber/ringpop-go/util" ) var ( // ErrDuplicateHook is returned when a hook that has already been registered // is registered again ErrDuplicateHook = errors.New("hook already registered") // ErrSelfEvictionInProgress is returned when ringpop is already in the process // of evicting itself from the network. ErrSelfEvictionInProgress = errors.New("ringpop is already executing a self-eviction") ) // SelfEvict defines the functions that interact with the self eviction of nodes // from the membership prior to shutting down type SelfEvict interface { // RegisterSelfEvictHook is used to register a SelfEvictHook interface to be // called during the shutting down of ringpop. Hooks can't be registered // after the self eviction has started. RegisterSelfEvictHook(hooks SelfEvictHook) error // SelfEvict should be called before shutting down the application to notify // the members of the membership that this node is going down and should not // receive reqeusts anymore. SelfEvict() error } // SelfEvictHook is an interface describing a module that can be registered to // the self eviction hooks type SelfEvictHook interface { // PreEvict is the hook that will be called before ringpop evicts itself // from the membership PreEvict() // PostEvict is the hook that will be called after ringpop has evicted // itself from them memership PostEvict() } type hookFn func(SelfEvictHook) // SelfEvictOptions configures how self eviction should behave. Applications can // configure if ringpop should proactively ping members of the network on self // eviction and what percentage/ratio of the memberlist should be pinged at most type SelfEvictOptions struct { PingRatio float64 } func mergeSelfEvictOptions(opt, def SelfEvictOptions) SelfEvictOptions { return SelfEvictOptions{ PingRatio: util.SelectFloat(opt.PingRatio, def.PingRatio), } } type evictionPhase int const ( preEvict evictionPhase = iota evicting postEvict done ) type phase struct { phase evictionPhase start time.Time end time.Time // phase specific information // phase: evicting numberOfPings int numberOfSuccessfulPings int32 } type selfEvict struct { // lock is used to gate access to state changing operations like registering // hooks and starting self eviction. After eviction has started all state // changing functions should return ErrSelfEvictionInProgress lock sync.Mutex node *Node options SelfEvictOptions logger bark.Logger phases []*phase hooks []SelfEvictHook } func newSelfEvict(node *Node, options SelfEvictOptions) *selfEvict { return &selfEvict{ node: node, options: options, logger: node.logger, } } // RegisterSelfEvictHook registers a pre/post eviction hook pair. If the hook // has already been registered before it returns the ErrDuplicateHook and does // not register it twice. func (s *selfEvict) RegisterSelfEvictHook(hook SelfEvictHook) error { // lock the phases to prevent reading the current phase in a race. Unlocking // will happen when this function terminates to prevent self-eviction to // start while a hook is being added s.lock.Lock() defer s.lock.Unlock() if s.currentPhase() != nil { return ErrSelfEvictionInProgress } for _, h := range s.hooks { if h == hook { return ErrDuplicateHook } } s.hooks = append(s.hooks, hook) return nil } func (s *selfEvict) SelfEvict() error { // preEvict makes sure that SelfEvict will return ErrSelfEvictionInProgress // when eviction is already in progress err := s.preEvict() if err != nil { return err } s.evict() s.postEvict() s.done() return nil } func (s *selfEvict) preEvict() error { s.lock.Lock() if s.currentPhase() != nil { s.lock.Unlock() return ErrSelfEvictionInProgress } s.transitionTo(preEvict) s.lock.Unlock() s.logger.Info("ringpop is initiating self eviction sequence") s.runHooks(SelfEvictHook.PreEvict) return nil } func (s *selfEvict) evict() { s.lock.Lock() phase := s.transitionTo(evicting) s.lock.Unlock() s.node.memberlist.SetLocalStatus(Faulty) numberOfPingableMembers := s.node.memberlist.NumPingableMembers() maxNumberOfPings := int(math.Ceil(float64(numberOfPingableMembers) * s.options.PingRatio)) // final number of members to ping should not exceed any of: numberOfPings := util.Min( s.node.disseminator.maxP, // the piggyback counter numberOfPingableMembers, // the number of members we can ping maxNumberOfPings, // a configured percentage of members ) if numberOfPings <= 0 { // there are no nodes to be pinged, a value below 0 can be caused by a // negative ping ratio return } // select the members we are going to ping targets := s.node.memberlist.RandomPingableMembers(numberOfPings, nil) phase.numberOfPings = len(targets) s.logger.WithFields(bark.Fields{ "numberOfPings": phase.numberOfPings, "targets": targets, }).Debug("starting proactive gossip on self evict") var wg sync.WaitGroup wg.Add(len(targets)) for _, target := range targets { go func(target Member) { defer wg.Done() _, err := sendPing(s.node, target.address(), s.node.pingTimeout) if err == nil { atomic.AddInt32(&phase.numberOfSuccessfulPings, 1) } }(target) } wg.Wait() s.logger.WithFields(bark.Fields{ "numberOfPings": phase.numberOfPings, "numberOfSuccessfulPings": phase.numberOfSuccessfulPings, }).Debug("finished proactive gossip on self evict") } func (s *selfEvict) postEvict() { s.lock.Lock() s.transitionTo(postEvict) s.lock.Unlock() s.runHooks(SelfEvictHook.PostEvict) } func (s *selfEvict) done() { s.lock.Lock() s.transitionTo(done) phasesCount := len(s.phases) firstPhase := s.phases[0].start lastPhase := s.phases[phasesCount-1] s.lock.Unlock() duration := lastPhase.end.Sub(firstPhase) s.logger.WithFields(bark.Fields{ "phases": s.phases, "totalDuration": duration, }).Info("ringpop self eviction done") s.node.EmitEvent(SelfEvictedEvent{ PhasesCount: phasesCount, Duration: duration, }) } func (s *selfEvict) transitionTo(newPhase evictionPhase) *phase { p := &phase{ phase: newPhase, start: s.node.clock.Now(), } previousPhase := s.currentPhase() s.phases = append(s.phases, p) if previousPhase != nil { previousPhase.end = p.start } s.logger.WithFields(bark.Fields{ "newPhase": p, "oldPhase": previousPhase, }).Debug("ringpop self eviction phase-transitioning") return p } func (s *selfEvict) currentPhase() *phase { if len(s.phases) > 0 { return s.phases[len(s.phases)-1] } return nil } func (s *selfEvict) runHooks(dispatch hookFn) { var wg sync.WaitGroup wg.Add(len(s.hooks)) for _, hook := range s.hooks { go func(hook SelfEvictHook) { defer wg.Done() s.logger.Debugf("ringpop self eviction running hook: %+v", hook) dispatch(hook) s.logger.Debugf("ringpop self eviction hook done: %+v", hook) }(hook) } wg.Wait() s.logger.Debug("ringpop self eviction done running hooks") }