func sendPingRequests()

in Sources/DistributedActors/Cluster/SWIM/SWIMActorShell.swift [270:356]


    func sendPingRequests(_ directive: SWIM.Instance.SendPingRequestDirective, context: MyselfContext) {
        // We are only interested in successful pings, as a single success tells us the node is
        // still alive. Therefore we propagate only the first success, but no failures.
        // The failure case is handled through the timeout of the whole operation.
        let eventLoop = context.system._eventLoopGroup.next()
        let firstSuccessful = eventLoop.makePromise(of: SWIM.PingResponse.self)
        let pingTimeout = directive.timeout
        let peerToPing = directive.target

        let startedSendingPingRequestsSentAt: DispatchTime = .now()
        let pingRequestResponseTimeFirstTimer = self.swim.metrics.shell.pingRequestResponseTimeFirst
        firstSuccessful.futureResult.whenComplete { result in
            switch result {
            case .success: pingRequestResponseTimeFirstTimer.recordInterval(since: startedSendingPingRequestsSentAt)
            case .failure: ()
            }
        }

        for pingRequest in directive.requestDetails {
            let peerToPingRequestThrough = pingRequest.peerToPingRequestThrough
            let payload = pingRequest.payload
            let sequenceNumber = pingRequest.sequenceNumber

            context.log.trace("Sending ping request for [\(peerToPing)] to [\(peerToPingRequestThrough)] with payload: \(payload)")

            let pingRequestSentAt: DispatchTime = .now()
            let eachReplyPromise = eventLoop.makePromise(of: SWIM.PingResponse.self)

            self.metrics.shell.messageOutboundCount.increment()
            peerToPingRequestThrough.pingRequest(target: peerToPing, payload: payload, timeout: pingTimeout, sequenceNumber: sequenceNumber, context: context) { result in
                eachReplyPromise.completeWith(result)
            }
            context.onResultAsync(of: eachReplyPromise.futureResult, timeout: .effectivelyInfinite) { result in
                switch result {
                case .success(let response):
                    self.metrics.shell.pingRequestResponseTimeAll.recordInterval(since: pingRequestSentAt)
                    self.handleEveryPingRequestResponse(response: response, pinged: peerToPing, context: context)
                    if case .ack = response {
                        // We only cascade successful ping responses (i.e. `ack`s);
                        //
                        // While this has a slight timing implication on time timeout of the pings -- the node that is last
                        // in the list that we ping, has slightly less time to fulfil the "total ping timeout"; as we set a total timeout on the entire `firstSuccess`.
                        // In practice those timeouts will be relatively large (seconds) and the few millis here should not have a large impact on correctness.
                        firstSuccessful.succeed(response)
                    }
                case .failure(let error):
                    context.log.debug(".pingRequest resulted in error", metadata: self.swim.metadata([
                        "swim/pingRequest/target": "\(peerToPing)",
                        "swim/pingRequest/peerToPingRequestThrough": "\(peerToPingRequestThrough)",
                        "swim/pingRequest/sequenceNumber": "\(sequenceNumber)",
                        "error": "\(error)",
                    ]))
                    self.handleEveryPingRequestResponse(
                        response: .timeout(
                            target: peerToPing,
                            pingRequestOrigin: context.myself,
                            timeout: pingTimeout,
                            sequenceNumber: sequenceNumber
                        ),
                        pinged: peerToPing,
                        context: context
                    )
                    // these are generally harmless thus we do not want to log them on higher levels
                    context.log.trace("Failed pingRequest", metadata: [
                        "swim/target": "\(peerToPing)",
                        "swim/payload": "\(payload)",
                        "swim/pingTimeout": "\(pingTimeout)",
                        "error": "\(error)",
                    ])
                }
                return .same
            }
        }

        context.onResultAsync(of: firstSuccessful.futureResult, timeout: .effectivelyInfinite) { result in
            switch result {
            case .success(let response):
                self.handlePingRequestResponse(response: response, pinged: peerToPing, context: context)
            case .failure(let error):
                context.log.debug("Failed to sendPingRequests", metadata: [
                    "error": "\(error)",
                ])
                self.handlePingRequestResponse(response: .timeout(target: peerToPing, pingRequestOrigin: context.myself, timeout: pingTimeout, sequenceNumber: 0), pinged: peerToPing, context: context) // FIXME: that sequence number...
            }
            return .same
        }
    }