swim/disseminator.go (192 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package swim
import (
"math"
"sync"
"time"
log "github.com/uber-common/bark"
"github.com/uber/ringpop-go/logging"
)
var log10 = math.Log(10)
// defaultPFactor is the piggyback factor value, described in the swim paper.
const defaultPFactor int = 15
// A pChange is a change with a p count representing the number of times the
// change has been propagated to other nodes.
type pChange struct {
Change
p int
}
// A disseminator propagates changes to other nodes.
type disseminator struct {
node *Node
changes map[string]*pChange
// maxP indicates how many times a change is disseminated. It is declared
// in the swim paper as piggybackfactor * log(# nodes).
maxP int
pFactor int
sync.RWMutex
logger log.Logger
reverseFullSyncJobs chan struct{}
}
// newDisseminator returns a new Disseminator instance.
func newDisseminator(n *Node) *disseminator {
d := &disseminator{
node: n,
changes: make(map[string]*pChange),
maxP: defaultPFactor,
pFactor: defaultPFactor,
logger: logging.Logger("disseminator").WithField("local", n.Address()),
reverseFullSyncJobs: make(chan struct{}, n.maxReverseFullSyncJobs),
}
return d
}
func (d *disseminator) AdjustMaxPropagations() {
d.Lock()
numPingable := d.node.memberlist.NumPingableMembers()
prevMaxP := d.maxP
newMaxP := d.pFactor * int(math.Ceil(math.Log(float64(numPingable+1))/log10))
if newMaxP != prevMaxP {
d.maxP = newMaxP
d.node.EmitEvent(MaxPAdjustedEvent{prevMaxP, newMaxP})
d.logger.WithFields(log.Fields{
"newMax": newMaxP,
"prevMax": prevMaxP,
"propagationFactor": d.pFactor,
"numPingable": numPingable,
}).Debug("adjusted max propagation count")
}
d.Unlock()
}
// HasChanges reports whether disseminator has changes to disseminate.
func (d *disseminator) HasChanges() bool {
d.RLock()
result := len(d.changes) > 0
d.RUnlock()
return result
}
func (d *disseminator) MembershipAsChanges() (changes []Change) {
d.Lock()
localMember := d.node.memberlist.LocalMember()
for _, member := range d.node.memberlist.GetMembers() {
change := Change{}
change.populateSubject(&member)
change.populateSource(&localMember)
changes = append(changes, change.validateOutgoing())
}
d.Unlock()
return changes
}
// IssueAsSender collects all changes a node needs when sending a ping or
// ping-req. The second return value is a callback that raises the piggyback
// counters of the given changes.
func (d *disseminator) IssueAsSender() (changes []Change, bumpPiggybackCounters func()) {
changes = d.issueChanges()
return changes, func() {
d.bumpPiggybackCounters(changes)
}
}
func (d *disseminator) bumpPiggybackCounters(changes []Change) {
d.Lock()
for _, change := range changes {
c, ok := d.changes[change.Address]
if !ok {
continue
}
c.p++
if c.p >= d.maxP {
delete(d.changes, c.Address)
}
}
d.Unlock()
}
// IssueAsReceiver collects all changes a node needs when responding to a ping
// or ping-req. Unlike IssueAsSender, IssueAsReceiver automatically increments
// the piggyback counters because it's difficult to find out whether a response
// reaches the client. The second return value indicates whether a full sync
// is triggered.
func (d *disseminator) IssueAsReceiver(
senderAddress string,
senderIncarnation int64,
senderChecksum uint32) (changes []Change, fullSync bool) {
changes = d.issueChanges()
// filter out changes that came from the sender previously
changes = d.filterChangesFromSender(changes, senderAddress, senderIncarnation)
d.bumpPiggybackCounters(changes)
if len(changes) > 0 || d.node.memberlist.Checksum() == senderChecksum {
return changes, false
}
d.node.EmitEvent(FullSyncEvent{senderAddress, senderChecksum})
d.node.logger.WithFields(log.Fields{
"localChecksum": d.node.memberlist.Checksum(),
"remote": senderAddress,
"remoteChecksum": senderChecksum,
}).Info("full sync")
return d.MembershipAsChanges(), true
}
// filterChangesFromSender returns changes that didn't originate at the sender.
// Attention, this function reorders the underlaying input array.
func (d *disseminator) filterChangesFromSender(cs []Change, source string, incarnation int64) []Change {
for i := 0; i < len(cs); i++ {
if incarnation == cs[i].SourceIncarnation && source == cs[i].Source {
d.node.EmitEvent(ChangeFilteredEvent{cs[i]})
// swap, and not just overwrite, so that in the end only the order
// of the underlying array has changed.
cs[i], cs[len(cs)-1] = cs[len(cs)-1], cs[i]
cs = cs[:len(cs)-1]
i--
}
}
return cs
}
func (d *disseminator) issueChanges() []Change {
d.Lock()
// To make JSON output [] instead of null on empty change list
result := []Change{}
for _, change := range d.changes {
result = append(result, change.Change.validateOutgoing())
}
d.Unlock()
d.node.EmitEvent(ChangesCalculatedEvent{result})
return result
}
func (d *disseminator) ClearChanges() {
d.Lock()
d.changes = make(map[string]*pChange)
d.Unlock()
}
func (d *disseminator) RecordChange(change Change) {
d.Lock()
d.changes[change.Address] = &pChange{change, 0}
d.Unlock()
}
func (d *disseminator) ClearChange(address string) {
d.Lock()
delete(d.changes, address)
d.Unlock()
}
func (d *disseminator) ChangesByAddress(address string) (Change, bool) {
d.RLock()
change, ok := d.changes[address]
var c Change
if ok {
c = change.Change
}
d.RUnlock()
return c, ok
}
func (d *disseminator) ChangesCount() int {
d.RLock()
c := len(d.changes)
d.RUnlock()
return c
}
// tryStartReverseFullSync fires a goroutine that performs a full sync. We omit
// the reverse full sync if the maximum number of processes are already
// running. This ensures no more than reverseFullSyncJobs processes are
// running concurrently.
func (d *disseminator) tryStartReverseFullSync(target string, timeout time.Duration) {
// occupy a job, return if none are available
select {
case d.reverseFullSyncJobs <- struct{}{}:
// continue if job is available
default:
d.logger.WithFields(log.Fields{
"remote": target,
}).Info("omit bidirectional full sync, too many already running")
d.node.EmitEvent(OmitReverseFullSyncEvent{Target: target})
return
}
// start reverse full sync
go func() {
d.reverseFullSync(target, timeout)
// create a new vacancy when the job is done
<-d.reverseFullSyncJobs
}()
}
// reverseFullSync is the second part of a bidirectional full sync. The first
// part is performed by the IssueAsReceiver method. The reverse full sync
// ensures that this node merges the membership of the target node's membership
// with its own.
func (d *disseminator) reverseFullSync(target string, timeout time.Duration) {
d.node.EmitEvent(StartReverseFullSyncEvent{Target: target})
res, err := sendJoinRequest(d.node, target, timeout)
if err != nil || res == nil {
d.logger.WithFields(log.Fields{
"remote": target,
"error": err,
}).Warn("bidirectional full sync failed due to failed join request")
return
}
d.logger.WithFields(log.Fields{
"remote": target,
}).Info("bidirectional full sync")
cs := d.node.memberlist.Update(res.Membership)
if len(cs) == 0 {
d.node.EmitEvent(RedundantReverseFullSyncEvent{Target: target})
}
}