in swim/join_sender.go [282:360]
func (j *joinSender) JoinCluster() ([]string, error) {
var nodesJoined []string
var numGroups = 0
var numJoined = 0
var numFailed = 0
var startTime = time.Now()
if util.SingleNodeCluster(j.node.address, j.bootstrapHostsMap) {
j.logger.Info("got single node cluster to join")
return nodesJoined, nil
}
for {
if j.node.Destroyed() {
j.node.EmitEvent(JoinFailedEvent{
Reason: Destroyed,
Error: nil,
})
return nil, errors.New("node destroyed while attempting to join cluster")
}
// join group of nodes
successes, failures := j.JoinGroup(nodesJoined)
nodesJoined = append(nodesJoined, successes...)
numJoined += len(successes)
numFailed += len(failures)
numGroups++
if numJoined >= j.size {
j.logger.WithFields(log.Fields{
"joinSize": j.size,
"joinTime": time.Now().Sub(startTime),
"numJoined": numJoined,
"numFailed": numFailed,
"numGroups": numGroups,
}).Debug("join complete")
break
}
joinDuration := time.Now().Sub(startTime)
if joinDuration > j.maxJoinDuration {
j.logger.WithFields(log.Fields{
"joinDuration": joinDuration,
"maxJoinDuration": j.maxJoinDuration,
"numJoined": numJoined,
"numFailed": numFailed,
"startTime": startTime,
}).Warn("max join duration exceeded")
err := fmt.Errorf("join duration of %v exceeded max %v",
joinDuration, j.maxJoinDuration)
j.node.EmitEvent(JoinFailedEvent{
Reason: Error,
Error: err,
})
return nodesJoined, err
}
j.logger.WithFields(log.Fields{
"joinSize": j.size,
"numJoined": numJoined,
"numFailed": numFailed,
"startTime": startTime,
}).Debug("join not yet complete")
j.delayer.delay()
}
j.node.EmitEvent(JoinCompleteEvent{
Duration: time.Now().Sub(startTime),
NumJoined: numJoined,
Joined: nodesJoined,
})
return nodesJoined, nil
}