in Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift [270:356]
func sendPingRequests(_ directive: SWIM.Instance.SendPingRequestDirective, context: MyselfContext) {
// We are only interested in successful pings, as a single success tells us the node is
// still alive. Therefore we propagate only the first success, but no failures.
// The failure case is handled through the timeout of the whole operation.
let eventLoop = context.system._eventLoopGroup.next()
let firstSuccessful = eventLoop.makePromise(of: SWIM.PingResponse.self)
let pingTimeout = directive.timeout
let peerToPing = directive.target
let startedSendingPingRequestsSentAt: DispatchTime = .now()
let pingRequestResponseTimeFirstTimer = self.swim.metrics.shell.pingRequestResponseTimeFirst
firstSuccessful.futureResult.whenComplete { result in
switch result {
case .success: pingRequestResponseTimeFirstTimer.recordInterval(since: startedSendingPingRequestsSentAt)
case .failure: ()
}
}
for pingRequest in directive.requestDetails {
let peerToPingRequestThrough = pingRequest.peerToPingRequestThrough
let payload = pingRequest.payload
let sequenceNumber = pingRequest.sequenceNumber
context.log.trace("Sending ping request for [\(peerToPing)] to [\(peerToPingRequestThrough)] with payload: \(payload)")
let pingRequestSentAt: DispatchTime = .now()
let eachReplyPromise = eventLoop.makePromise(of: SWIM.PingResponse.self)
self.metrics.shell.messageOutboundCount.increment()
peerToPingRequestThrough.pingRequest(target: peerToPing, payload: payload, timeout: pingTimeout, sequenceNumber: sequenceNumber, context: context) { result in
eachReplyPromise.completeWith(result)
}
context.onResultAsync(of: eachReplyPromise.futureResult, timeout: .effectivelyInfinite) { result in
switch result {
case .success(let response):
self.metrics.shell.pingRequestResponseTimeAll.recordInterval(since: pingRequestSentAt)
self.handleEveryPingRequestResponse(response: response, pinged: peerToPing, context: context)
if case .ack = response {
// We only cascade successful ping responses (i.e. `ack`s);
//
// While this has a slight timing implication on time timeout of the pings -- the node that is last
// in the list that we ping, has slightly less time to fulfil the "total ping timeout"; as we set a total timeout on the entire `firstSuccess`.
// In practice those timeouts will be relatively large (seconds) and the few millis here should not have a large impact on correctness.
firstSuccessful.succeed(response)
}
case .failure(let error):
context.log.debug(".pingRequest resulted in error", metadata: self.swim.metadata([
"swim/pingRequest/target": "\(peerToPing)",
"swim/pingRequest/peerToPingRequestThrough": "\(peerToPingRequestThrough)",
"swim/pingRequest/sequenceNumber": "\(sequenceNumber)",
"error": "\(error)",
]))
self.handleEveryPingRequestResponse(
response: .timeout(
target: peerToPing,
pingRequestOrigin: context.myself,
timeout: pingTimeout,
sequenceNumber: sequenceNumber
),
pinged: peerToPing,
context: context
)
// these are generally harmless thus we do not want to log them on higher levels
context.log.trace("Failed pingRequest", metadata: [
"swim/target": "\(peerToPing)",
"swim/payload": "\(payload)",
"swim/pingTimeout": "\(pingTimeout)",
"error": "\(error)",
])
}
return .same
}
}
context.onResultAsync(of: firstSuccessful.futureResult, timeout: .effectivelyInfinite) { result in
switch result {
case .success(let response):
self.handlePingRequestResponse(response: response, pinged: peerToPing, context: context)
case .failure(let error):
context.log.debug("Failed to sendPingRequests", metadata: [
"error": "\(error)",
])
self.handlePingRequestResponse(response: .timeout(target: peerToPing, pingRequestOrigin: context.myself, timeout: pingTimeout, sequenceNumber: 0), pinged: peerToPing, context: context) // FIXME: that sequence number...
}
return .same
}
}