in swim/join_sender.go [365:436]
func (j *joinSender) JoinGroup(nodesJoined []string) ([]string, []string) {
group := j.SelectGroup(nodesJoined)
var responses struct {
successes []string
failures []string
sync.Mutex
}
var numNodesLeft = j.size - len(nodesJoined)
var startTime = time.Now()
var wg sync.WaitGroup
j.numTries++
j.node.EmitEvent(JoinTriesUpdateEvent{j.numTries})
for _, target := range group {
wg.Add(1)
go func(target string) {
defer wg.Done()
res, err := sendJoinRequest(j.node, target, j.timeout)
if err != nil {
msg := "attempt to join node failed"
if err == errJoinTimeout {
msg = "attempt to join node timed out"
}
j.logger.WithFields(log.Fields{
"remote": target,
"timeout": j.timeout,
}).Debug(msg)
responses.Lock()
responses.failures = append(responses.failures, target)
responses.Unlock()
return
}
responses.Lock()
responses.successes = append(responses.successes, target)
responses.Unlock()
start := time.Now()
j.node.memberlist.AddJoinList(res.Membership)
j.node.EmitEvent(AddJoinListEvent{
Duration: time.Now().Sub(start),
})
}(target)
}
// wait for joins to complete
wg.Wait()
// don't need to lock successes/failures since we're finished writing to them
j.logger.WithFields(log.Fields{
"groupSize": len(group),
"joinSize": j.size,
"joinTime": time.Now().Sub(startTime),
"numNodesLeft": numNodesLeft,
"numFailures": len(responses.failures),
"failures": responses.failures,
"numSuccesses": len(responses.successes),
"successes": responses.successes,
}).Debug("join group complete")
return responses.successes, responses.failures
}