swim/join_sender.go (322 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package swim
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
log "github.com/uber-common/bark"
"github.com/uber/ringpop-go/logging"
"github.com/uber/ringpop-go/shared"
"github.com/uber/ringpop-go/util"
"github.com/uber/tchannel-go/json"
)
const (
// If a node cannot complete a join within defaultMaxJoinDuration
// there is likely something very wrong. The aim is for the join
// operation to take no more than 1s, under normal conditions.
//
// The duration assigned below is very high for the following
// purposes:
// - Gives an application developer some time to diagnose
// what could be wrong.
// - Gives an operator some time to bootstrap a newly
// provisioned cluster
// - Trying forever is futile
defaultMaxJoinDuration = 120 * time.Second
defaultJoinTimeout = time.Second
defaultJoinSize = 3
defaultParallelismFactor = 2
)
var errJoinTimeout = errors.New("join timed out")
// A joinRequest is used to request a join to a remote node
type joinRequest struct {
App string `json:"app"`
Source string `json:"source"`
Incarnation int64 `json:"incarnationNumber"`
Timeout time.Duration `json:"timeout"`
Labels map[string]string `json:"labels,omitempty"`
}
// joinOpts are opts to perform a join with
type joinOpts struct {
timeout time.Duration
size int
maxJoinDuration time.Duration
parallelismFactor int
// delayer delays repeated join attempts.
delayer joinDelayer
}
// A joinSender is used to join an existing cluster of nodes defined in a node's
// bootstrap list
type joinSender struct {
node *Node
timeout time.Duration
// bootstrapHostsMap is a map of unique hosts each containing a slice of
// the instances (hostsports) on that particular host.
bootstrapHostsMap map[string][]string
// This is used as a multiple of the required nodes left to satisfy
// `joinSize`. Additional parallelism can be applied in order for
// `joinSize` to be satisfied faster.
parallelismFactor int
maxJoinDuration time.Duration
potentialNodes []string
preferredNodes []string
nonPreferredNodes []string
// We either join the number of nodes defined by size or limit it to the number
// of `potentialNodes` as we can't join more than there are to join in the first place.
size int
// A round is a complete cycle through all potential join targets. When a round
// is completed we start all over again, though full cycles should be very rare.
// We try to join nodes until `joinSize` is reached or `maxJoinDuration` is exceeded.
roundPotentialNodes []string
roundPreferredNodes []string
roundNonPreferredNodes []string
numTries int
// delayer delays repeated join attempts.
delayer joinDelayer
logger log.Logger
}
// newJoinSender returns a new JoinSender to join a cluster with
func newJoinSender(node *Node, opts *joinOpts) (*joinSender, error) {
if opts == nil {
opts = &joinOpts{}
}
if node.discoverProvider == nil {
return nil, errors.New("no discover provider")
}
// Resolve/retrieve bootstrap hosts from the provider specified in the
// join options.
bootstrapHosts, err := node.discoverProvider.Hosts()
if err != nil {
return nil, err
}
// Check we're in the bootstrap host list and add ourselves if we're not
// there. If the host list is empty, this will create a single-node
// cluster.
if !util.StringInSlice(bootstrapHosts, node.Address()) {
bootstrapHosts = append(bootstrapHosts, node.Address())
}
js := &joinSender{
node: node,
logger: logging.Logger("join").WithField("local", node.Address()),
}
// Parse bootstrap hosts into a map
js.parseHosts(bootstrapHosts)
js.potentialNodes = js.CollectPotentialNodes(nil)
js.timeout = util.SelectDuration(opts.timeout, defaultJoinTimeout)
js.maxJoinDuration = util.SelectDuration(opts.maxJoinDuration, defaultMaxJoinDuration)
js.parallelismFactor = util.SelectInt(opts.parallelismFactor, defaultParallelismFactor)
js.size = util.SelectInt(opts.size, defaultJoinSize)
js.size = util.Min(js.size, len(js.potentialNodes))
js.delayer = opts.delayer
if js.delayer == nil {
// Create and use exponential delayer as the delay mechanism. Create it
// with nil opts which uses default delayOpts.
js.delayer, err = newExponentialDelayer(js.node.address, nil)
if err != nil {
return nil, err
}
}
return js, nil
}
// parseHosts populates the bootstrap hosts map from the provided slice of
// hostports.
func (j *joinSender) parseHosts(hostports []string) {
// Parse bootstrap hosts into a map
j.bootstrapHostsMap = util.HostPortsByHost(hostports)
// Perform some sanity checks on the bootstrap hosts
err := util.CheckLocalMissing(j.node.address, j.bootstrapHostsMap[util.CaptureHost(j.node.address)])
if err != nil {
j.logger.Warn(err.Error())
}
mismatched, err := util.CheckHostnameIPMismatch(j.node.address, j.bootstrapHostsMap)
if err != nil {
j.logger.WithField("mismatched", mismatched).Warn(err.Error())
}
}
// potential nodes are nodes that can be joined that are not the local node
func (j *joinSender) CollectPotentialNodes(nodesJoined []string) []string {
if nodesJoined == nil {
nodesJoined = make([]string, 0)
}
var potentialNodes []string
for _, hostports := range j.bootstrapHostsMap {
for _, hostport := range hostports {
if j.node.address != hostport && !util.StringInSlice(nodesJoined, hostport) {
potentialNodes = append(potentialNodes, hostport)
}
}
}
return potentialNodes
}
// preferred nodes are nodes that are not on the same host as the local node
func (j *joinSender) CollectPreferredNodes() []string {
var preferredNodes []string
for host, hostports := range j.bootstrapHostsMap {
if host != util.CaptureHost(j.node.address) {
preferredNodes = append(preferredNodes, hostports...)
}
}
return preferredNodes
}
// non-preferred nodes are everyone else
func (j *joinSender) CollectNonPreferredNodes() []string {
if len(j.preferredNodes) == 0 {
return j.potentialNodes
}
var nonPreferredNodes []string
for _, host := range j.bootstrapHostsMap[util.CaptureHost(j.node.address)] {
if host != j.node.address {
nonPreferredNodes = append(nonPreferredNodes, host)
}
}
return nonPreferredNodes
}
func (j *joinSender) Init(nodesJoined []string) {
rand.Seed(time.Now().UnixNano())
j.potentialNodes = j.CollectPotentialNodes(nodesJoined)
j.preferredNodes = j.CollectPreferredNodes()
j.nonPreferredNodes = j.CollectNonPreferredNodes()
j.roundPotentialNodes = append(j.roundPotentialNodes, j.potentialNodes...)
j.roundPreferredNodes = append(j.roundPreferredNodes, j.preferredNodes...)
j.roundNonPreferredNodes = append(j.roundNonPreferredNodes, j.nonPreferredNodes...)
}
// selects a group of nodes
func (j *joinSender) SelectGroup(nodesJoined []string) []string {
var group []string
// if fully exhausted or first round, initialize this round's nodes
if len(j.roundPreferredNodes) == 0 && len(j.roundNonPreferredNodes) == 0 {
j.Init(nodesJoined)
}
numNodesLeft := j.size - len(nodesJoined)
cont := func() bool {
if len(group) == numNodesLeft*j.parallelismFactor {
return false
}
nodesAvailable := len(j.roundPreferredNodes) + len(j.roundNonPreferredNodes)
if nodesAvailable == 0 {
return false
}
return true
}
for cont() {
if len(j.roundPreferredNodes) > 0 {
group = append(group, util.TakeNode(&j.roundPreferredNodes, -1))
} else if len(j.roundNonPreferredNodes) > 0 {
group = append(group, util.TakeNode(&j.roundNonPreferredNodes, -1))
}
}
return group
}
func (j *joinSender) JoinCluster() ([]string, error) {
var nodesJoined []string
var numGroups = 0
var numJoined = 0
var numFailed = 0
var startTime = time.Now()
if util.SingleNodeCluster(j.node.address, j.bootstrapHostsMap) {
j.logger.Info("got single node cluster to join")
return nodesJoined, nil
}
for {
if j.node.Destroyed() {
j.node.EmitEvent(JoinFailedEvent{
Reason: Destroyed,
Error: nil,
})
return nil, errors.New("node destroyed while attempting to join cluster")
}
// join group of nodes
successes, failures := j.JoinGroup(nodesJoined)
nodesJoined = append(nodesJoined, successes...)
numJoined += len(successes)
numFailed += len(failures)
numGroups++
if numJoined >= j.size {
j.logger.WithFields(log.Fields{
"joinSize": j.size,
"joinTime": time.Now().Sub(startTime),
"numJoined": numJoined,
"numFailed": numFailed,
"numGroups": numGroups,
}).Debug("join complete")
break
}
joinDuration := time.Now().Sub(startTime)
if joinDuration > j.maxJoinDuration {
j.logger.WithFields(log.Fields{
"joinDuration": joinDuration,
"maxJoinDuration": j.maxJoinDuration,
"numJoined": numJoined,
"numFailed": numFailed,
"startTime": startTime,
}).Warn("max join duration exceeded")
err := fmt.Errorf("join duration of %v exceeded max %v",
joinDuration, j.maxJoinDuration)
j.node.EmitEvent(JoinFailedEvent{
Reason: Error,
Error: err,
})
return nodesJoined, err
}
j.logger.WithFields(log.Fields{
"joinSize": j.size,
"numJoined": numJoined,
"numFailed": numFailed,
"startTime": startTime,
}).Debug("join not yet complete")
j.delayer.delay()
}
j.node.EmitEvent(JoinCompleteEvent{
Duration: time.Now().Sub(startTime),
NumJoined: numJoined,
Joined: nodesJoined,
})
return nodesJoined, nil
}
// JoinGroup collects a number of nodes to join and sends join requests to them.
// nodesJoined contains the nodes that are already joined. The method returns
// the nodes that are succesfully joined, and the nodes that failed respond.
func (j *joinSender) JoinGroup(nodesJoined []string) ([]string, []string) {
group := j.SelectGroup(nodesJoined)
var responses struct {
successes []string
failures []string
sync.Mutex
}
var numNodesLeft = j.size - len(nodesJoined)
var startTime = time.Now()
var wg sync.WaitGroup
j.numTries++
j.node.EmitEvent(JoinTriesUpdateEvent{j.numTries})
for _, target := range group {
wg.Add(1)
go func(target string) {
defer wg.Done()
res, err := sendJoinRequest(j.node, target, j.timeout)
if err != nil {
msg := "attempt to join node failed"
if err == errJoinTimeout {
msg = "attempt to join node timed out"
}
j.logger.WithFields(log.Fields{
"remote": target,
"timeout": j.timeout,
}).Debug(msg)
responses.Lock()
responses.failures = append(responses.failures, target)
responses.Unlock()
return
}
responses.Lock()
responses.successes = append(responses.successes, target)
responses.Unlock()
start := time.Now()
j.node.memberlist.AddJoinList(res.Membership)
j.node.EmitEvent(AddJoinListEvent{
Duration: time.Now().Sub(start),
})
}(target)
}
// wait for joins to complete
wg.Wait()
// don't need to lock successes/failures since we're finished writing to them
j.logger.WithFields(log.Fields{
"groupSize": len(group),
"joinSize": j.size,
"joinTime": time.Now().Sub(startTime),
"numNodesLeft": numNodesLeft,
"numFailures": len(responses.failures),
"failures": responses.failures,
"numSuccesses": len(responses.successes),
"successes": responses.successes,
}).Debug("join group complete")
return responses.successes, responses.failures
}
// sendJoinRequest sends a join request to the specified target.
func sendJoinRequest(node *Node, target string, timeout time.Duration) (*joinResponse, error) {
ctx, cancel := shared.NewTChannelContext(timeout)
defer cancel()
peer := node.channel.Peers().GetOrAdd(target)
req := joinRequest{
App: node.app,
Source: node.address,
Incarnation: node.Incarnation(),
Timeout: timeout,
Labels: node.Labels().AsMap(),
}
res := &joinResponse{}
// make request
errC := make(chan error, 1)
go func() {
errC <- json.CallPeer(ctx, peer, node.service, "/protocol/join", req, res)
}()
// wait for result or timeout
var err error
select {
case err = <-errC:
case <-ctx.Done():
err = errJoinTimeout
}
if err != nil {
logging.Logger("join").WithFields(log.Fields{
"local": node.Address(),
"error": err,
}).Debug("could not complete join")
return nil, err
}
return res, err
}
// SendJoin creates a new JoinSender and attempts to join the cluster defined by
// the nodes bootstrap hosts
func sendJoin(node *Node, opts *joinOpts) ([]string, error) {
joiner, err := newJoinSender(node, opts)
if err != nil {
return nil, err
}
return joiner.JoinCluster()
}