in swim/memberlist.go [519:629]
func (m *memberlist) Update(changes []Change) (applied []Change) {
if m.node.Stopped() || len(changes) == 0 {
return nil
}
// validate incoming changes
for i, change := range changes {
changes[i] = change.validateIncoming()
}
m.node.EmitEvent(MemberlistChangesReceivedEvent{changes})
var memberChanges []membership.MemberChange
m.Lock()
// run through all changes received and figure out if they need to be accepted
m.members.Lock()
for _, change := range changes {
member, has := m.members.byAddress[change.Address]
// transform the change into a member that we can test against existing
// members
gossip := Member{}
gossip.populateFromChange(&change)
// test to see if we need to process the gossip
if shouldProcessGossip(member, &gossip) {
// the gossip overwrites the know state about the member
if gossip.Address == m.local.Address {
// if the gossip is about the local member it needs to be
// countered by increasing the incarnation number and gossip the
// new state to the network.
change = m.bumpIncarnation()
m.node.EmitEvent(RefuteUpdateEvent{})
} else {
// otherwise it can be applied to the memberlist
// prepare the change and collect if there is an outside
// observable change eg. changes that involve active
// participants of the membership (pingable)
memberChange := membership.MemberChange{}
if has && member.isReachable() {
memberChange.Before = *member
}
if gossip.isReachable() {
memberChange.After = gossip
}
if memberChange.Before != nil || memberChange.After != nil {
memberChanges = append(memberChanges, memberChange)
}
if !has {
// if the member was not already present in the list we will
// add it and assign it a random position in the list to ensure
// guarantees for pinging
m.members.byAddress[gossip.Address] = &gossip
i := m.getJoinPosition()
m.members.list = append(m.members.list[:i], append([]*Member{&gossip}, m.members.list[i:]...)...)
} else {
// copy the value of the gossip into the already existing
// struct. This operation is by value, not by reference.
// this is to keep both the list and byAddress map in sync
// without tedious lookup operations.
*member = gossip
}
}
// keep track of the change that it has been applied
applied = append(applied, change)
}
}
m.members.Unlock()
if len(applied) > 0 {
// when there are changes applied we need to recalculate our checksum
oldChecksum := m.Checksum()
m.ComputeChecksum()
for _, change := range applied {
if change.Source != m.node.address {
m.logger.WithFields(bark.Fields{
"remote": change.Source,
}).Debug("ringpop applied remote update")
}
}
m.node.EmitEvent(MemberlistChangesAppliedEvent{
Changes: applied,
OldChecksum: oldChecksum,
NewChecksum: m.Checksum(),
NumMembers: m.NumMembers(),
})
m.node.handleChanges(applied)
}
// if there are changes that are important for outside observers of the
// membership emit those
if len(memberChanges) > 0 {
m.node.EmitEvent(membership.ChangeEvent{
Changes: memberChanges,
})
}
m.Unlock()
return applied
}