swim/heal_partition.go (74 lines of code) (raw):
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package swim
import "time"
// AttemptHeal attempts to heal a partition between the node and the target.
//
// Be mindfull that calling this function will not result in a heal when there
// are nodes that need to be reincarated to take precedence over the faulty
// declarations that occur during a network partition. A cluster may therefore
// need multiple calls to this function with some time in between to heal.
//
// Check out ringpop-common/docs for a full description of the algorithm.
func AttemptHeal(node *Node, target string) ([]string, error) {
node.EmitEvent(AttemptHealEvent{})
node.logger.WithField("target", target).Info("attempt heal")
// If join request succeeds a partition is detected,
// this node will now coordinate the healing mechanism.
joinRes, err := sendJoinRequest(node, target, time.Second)
if err != nil {
return nil, err
}
MA := node.disseminator.MembershipAsChanges()
MB := joinRes.Membership
// Get the nodes that aren't mergeable and need to be reincarnated
changesForA, changesForB := nodesThatNeedToReincarnate(MA, MB)
// Reincarnate the nodes that need to be reincarnated
if len(changesForA) != 0 || len(changesForB) != 0 {
err = reincarnateNodes(node, target, changesForA, changesForB)
return pingableHosts(MB), err
}
// Merge partitions if no node needs to be reincarnated
err = mergePartitions(node, target, MB)
return pingableHosts(MB), err
}
// nodesThatNeedToReincarnate finds all nodes would become unpingable (>=faulty)
// when membership A gets merged with membership B and visa versa. These nodes
// need to be reincarnated, before we can heal the partition with a merge.
func nodesThatNeedToReincarnate(MA, MB []Change) (changesForA, changesForB []Change) {
// Find changes that are alive for B and faulty for A and visa versa.
for _, b := range MB {
a, ok := selectMember(MA, b.Address)
if !ok {
continue
}
// If a node would be overwritten and would stop being pingable, it is
// not suited for merging.
if b.isPingable() && a.overrides(b) && !a.isPingable() {
// take the change in a local variable, this protects to inadvertly
// changing the type of `a` from a value type to a pointer type.
var change Change
change = a
// Remove the source information from the change. If the source
// information is present and it is gossiped to the other partition
// it might cause the partitions to heal before they are in a safe
// state.
change.scrubSource()
// gossip the suspect status
change.Status = Suspect
// record the change to be sent to B
changesForB = append(changesForB, change)
}
if a.isPingable() && b.overrides(a) && !b.isPingable() {
// take the change in a local variable, this protects to inadvertly
// changing the type of `a` from a value type to a pointer type.
var change Change
change = b
// Remove the source information from the change. If the source
// information is present and it is gossiped to the other partition
// it might cause the partitions to heal before they are in a safe
// state.
change.scrubSource()
// gossip the suspect status
change.Status = Suspect
// record the change to be sent to A
changesForA = append(changesForA, change)
}
}
return changesForA, changesForB
}
// reincarnateNodes applies changesForA to this nodes membership, and sends a ping
// with changesForB to B's membership, so that B will apply those changes in its
// ping handler.
func reincarnateNodes(node *Node, target string, changesForA, changesForB []Change) error {
// reincarnate all nodes by disseminating that they are suspect
node.healer.logger.WithField("target", target).Info("reincarnate nodes before we can merge the partitions")
node.memberlist.Update(changesForA)
var err error
if len(changesForB) != 0 {
_, err = sendPingWithChanges(node, target, changesForB, time.Second)
}
return err
}
// mergePartitions applies the membership of B to a and send the membership
// A to B piggybacked on top of a ping.
func mergePartitions(node *Node, target string, MB []Change) error {
node.healer.logger.WithField("target", target).Info("merge two partitions")
// Add membership of B to this node, so that the membership
// information of B will be disseminated through A.
node.memberlist.Update(MB)
// Send membership of A to the target node, so that the membership
// information of partition A will be disseminated through B.
MA := node.disseminator.MembershipAsChanges()
_, err := sendPingWithChanges(node, target, MA, time.Second)
return err
}
// pingableHosts returns the address of those changes that are pingable.
func pingableHosts(changes []Change) (ret []string) {
for _, b := range changes {
if b.isPingable() {
ret = append(ret, b.Address)
}
}
return
}
// selectMember selects the member with the specified address from the partition.
func selectMember(partition []Change, address string) (Change, bool) {
for _, m := range partition {
if m.Address == address {
return m, true
}
}
return Change{}, false
}