swim/gossip.go (122 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package swim import ( "math" "math/rand" "sync" "time" "github.com/rcrowley/go-metrics" log "github.com/uber-common/bark" "github.com/uber/ringpop-go/logging" ) // Gossip handles the protocol period of the SWIM protocol type gossip struct { node *Node state struct { sync.RWMutex running bool // control channels for background tasks protocolPeriodStop chan bool protocolPeriodWait <-chan bool protocolRateChannel chan bool } minProtocolPeriod time.Duration protocol struct { numPeriods int lastPeriod time.Time lastRate time.Duration timing metrics.Histogram sync.RWMutex } logger log.Logger } // newGossip returns a new gossip SWIM sub-protocol with the given protocol period func newGossip(node *Node, minProtocolPeriod time.Duration) *gossip { gossip := &gossip{ node: node, minProtocolPeriod: minProtocolPeriod, logger: logging.Logger("gossip").WithField("local", node.Address()), } gossip.protocol.timing = metrics.NewHistogram(metrics.NewUniformSample(10)) gossip.protocol.timing.Update(int64(gossip.minProtocolPeriod)) return gossip } // computes a delay for the gossip protocol period func (g *gossip) ComputeProtocolDelay() time.Duration { g.protocol.RLock() defer g.protocol.RUnlock() var delay time.Duration if g.protocol.numPeriods != 0 { target := g.protocol.lastPeriod.Add(g.protocol.lastRate) delay = time.Duration(math.Max(float64(target.Sub(time.Now())), float64(g.minProtocolPeriod))) } else { // delay for first tick in [0, minProtocolPeriod]ms delay = time.Duration(rand.Intn(int(g.minProtocolPeriod + 1))) } g.node.EmitEvent(ProtocolDelayComputeEvent{ Duration: delay, }) return delay } func (g *gossip) ProtocolRate() time.Duration { g.protocol.RLock() rate := g.protocol.lastRate g.protocol.RUnlock() return rate } // computes a ProtocolRate for the Gossip func (g *gossip) AdjustProtocolRate() { g.protocol.Lock() observed := g.protocol.timing.Percentile(0.5) * 2.0 g.protocol.lastRate = time.Duration(math.Max(observed, float64(g.minProtocolPeriod))) g.protocol.Unlock() } func (g *gossip) ProtocolTiming() metrics.Histogram { g.protocol.RLock() histogram := g.protocol.timing g.protocol.RUnlock() return histogram } // start the gossip protocol func (g *gossip) Start() { g.state.Lock() defer g.state.Unlock() if g.state.running { g.logger.Warn("gossip already started") return } // mark the state to be running g.state.running = true // schedule repeat execution in the background g.state.protocolPeriodStop, g.state.protocolPeriodWait = scheduleRepeaditly(g.ProtocolPeriod, g.ComputeProtocolDelay, g.node.clock) g.state.protocolRateChannel, _ = scheduleRepeaditly(g.AdjustProtocolRate, func() time.Duration { return time.Second }, g.node.clock) g.logger.Debug("started gossip protocol") } // stop the gossip protocol func (g *gossip) Stop() { g.state.Lock() defer g.state.Unlock() if !g.state.running { g.logger.Warn("gossip already stopped") return } g.state.running = false // stop background execution of running tasks close(g.state.protocolPeriodStop) close(g.state.protocolRateChannel) // wait for the goroutine to be stopped _ = <-g.state.protocolPeriodWait g.logger.Debug("stopped gossip protocol") } // returns whether or not the gossip sub-protocol is stopped func (g *gossip) Stopped() bool { g.state.RLock() stopped := !g.state.running g.state.RUnlock() return stopped } // run a gossip protocol period func (g *gossip) ProtocolPeriod() { startTime := time.Now() g.node.pingNextMember() endTime := time.Now() g.protocol.Lock() lag := endTime.Sub(g.protocol.lastPeriod) wasFirst := (g.protocol.numPeriods == 0) g.protocol.lastPeriod = endTime g.protocol.numPeriods++ g.protocol.timing.Update(int64(time.Now().Sub(startTime))) g.protocol.Unlock() if !wasFirst { g.node.EmitEvent(ProtocolFrequencyEvent{ Duration: lag, }) } }