in swim/ping_request_sender.go [144:208]
func sendPingRequests(node *Node, target string, size int, timeout time.Duration) <-chan interface{} {
var peerAddresses []string
peers := node.memberlist.RandomPingableMembers(size, map[string]bool{target: true})
for _, peer := range peers {
peerAddresses = append(peerAddresses, peer.Address)
}
node.EmitEvent(PingRequestsSendEvent{
Local: node.Address(),
Target: target,
Peers: peerAddresses,
})
var wg sync.WaitGroup
resC := make(chan interface{}, size)
for _, peer := range peers {
wg.Add(1)
go func(peer Member) {
defer wg.Done()
p := newPingRequestSender(node, peer.Address, target, timeout)
p.logger.WithFields(log.Fields{
"peer": peer.Address,
"target": p.target,
}).Debug("sending ping request")
var startTime = time.Now()
res, err := p.SendPingRequest()
if err != nil {
node.EmitEvent(PingRequestSendErrorEvent{
Local: node.Address(),
Target: target,
Peers: peerAddresses,
Peer: peer.Address,
})
resC <- err
return
}
node.EmitEvent(PingRequestsSendCompleteEvent{
Local: node.Address(),
Target: target,
Peers: peerAddresses,
Peer: peer.Address,
Duration: time.Now().Sub(startTime),
})
resC <- res
}(peer)
}
// wait for all sends to complete before closing channel
go func() {
wg.Wait()
close(resC)
}()
return resC
}