mutating func onIncomingHandshakeOffer()

in Sources/DistributedActors/Cluster/ClusterShellState.swift [257:345]


    mutating func onIncomingHandshakeOffer(offer: Wire.HandshakeOffer, existingAssociation: Association?, incomingChannel: Channel) -> OnIncomingHandshakeOfferDirective {
        func prepareNegotiation0() -> OnIncomingHandshakeOfferDirective {
            let fsm = HandshakeStateMachine.HandshakeOfferReceivedState(state: self, offer: offer)
            self._handshakes[offer.originNode.node] = .wasOfferedHandshake(fsm)
            return .negotiateIncoming(fsm)
        }

        if let assoc = existingAssociation {
            switch assoc.state {
            case .associating:
                () // continue, we'll perform the tie-breaker logic below
            case .associated:
                let error = HandshakeStateMachine.HandshakeConnectionError(
                    node: offer.originNode.node,
                    message: "Terminating this connection, the node [\(offer.originNode)] is already associated. Possibly a delayed handshake retry message was delivered?"
                )
                return .abortIncomingHandshake(error)
            case .tombstone:
                let error = HandshakeStateMachine.HandshakeConnectionError(
                    node: offer.originNode.node,
                    message: "Terminating this connection, the node [\(offer.originNode)] is already tombstone-ed. Possibly a delayed handshake retry message was delivered?"
                )
                return .abortIncomingHandshake(error)
            }
        }

        guard let inProgress = self._handshakes[offer.originNode.node] else {
            // no other concurrent handshakes in progress; good, this is happy path, so we simply continue our negotiation
            return prepareNegotiation0()
        }

        switch inProgress {
        case .initiated(let initiated):

            /// Since we MAY have 2 connections open at this point in time -- one we opened, and another that was opened
            /// to us when the other node tried to associated, we'll perform a tie-breaker to ensure we predictably
            /// only use _one_ of them, and close the other.
            // let selectedChannel: Channel

            /// order on nodes is somewhat arbitrary, but that is fine, since we only need this for tiebreakers
            let tieBreakWinner = initiated.localNode < offer.originNode
            self.log.debug("""
            Concurrently initiated handshakes from nodes [\(initiated.localNode)](local) and [\(offer.originNode)](remote) \
            detected! Resolving race by address ordering; This node \(tieBreakWinner ? "WON (will negotiate and reply)" : "LOST (will await reply)") tie-break. 
            """, metadata: [
                "handshake/inProgress": "\(initiated)",
                "handshake/incoming/offer": "\(offer)",
                "handshake/incoming/channel": "\(incomingChannel)",
            ])

            if tieBreakWinner {
                if self.closeOutboundHandshakeChannel(with: offer.originNode.node) != nil {
                    self.log.debug(
                        "Aborted handshake, as concurrently negotiating another one with same node already",
                        metadata: [
                            "handshake/status": "abort-incoming,offer",
                            "handshake/from": "\(offer.originNode)",
                        ]
                    )
                }

                return prepareNegotiation0()

            } else {
                // we "lost", the other node will send the accept; when it does, the will complete the future.
                // concurrent handshake and we should abort
                let error = HandshakeStateMachine.HandshakeConnectionError(
                    node: offer.originNode.node,
                    message: """
                    Terminating this connection, as there is a concurrently established connection with same host [\(offer.originNode)] \
                    which will be used to complete the handshake.
                    """
                )
                return .abortIncomingHandshake(error)
            }
        case .wasOfferedHandshake:
            // suspicious but but not wrong, so we were offered before, and now are being offered again?
            // Situations:
            // - it could be that the remote re-sent their offer before it received our accept?
            // - maybe remote did not receive our accept/reject and is trying again?
            return prepareNegotiation0()

        // --- these are never stored ----
        case .inFlight(let inFlight):
            fatalError("inFlight state [\(inFlight)] should never have been stored as handshake state; This is likely a bug, please open an issue.")
        case .completed(let completed):
            fatalError("completed state [\(completed)] should never have been stored as handshake state; This is likely a bug, please open an issue.")
        }
    }