in Sources/DistributedActors/Cluster/ClusterShellState.swift [257:345]
mutating func onIncomingHandshakeOffer(offer: Wire.HandshakeOffer, existingAssociation: Association?, incomingChannel: Channel) -> OnIncomingHandshakeOfferDirective {
func prepareNegotiation0() -> OnIncomingHandshakeOfferDirective {
let fsm = HandshakeStateMachine.HandshakeOfferReceivedState(state: self, offer: offer)
self._handshakes[offer.originNode.node] = .wasOfferedHandshake(fsm)
return .negotiateIncoming(fsm)
}
if let assoc = existingAssociation {
switch assoc.state {
case .associating:
() // continue, we'll perform the tie-breaker logic below
case .associated:
let error = HandshakeStateMachine.HandshakeConnectionError(
node: offer.originNode.node,
message: "Terminating this connection, the node [\(offer.originNode)] is already associated. Possibly a delayed handshake retry message was delivered?"
)
return .abortIncomingHandshake(error)
case .tombstone:
let error = HandshakeStateMachine.HandshakeConnectionError(
node: offer.originNode.node,
message: "Terminating this connection, the node [\(offer.originNode)] is already tombstone-ed. Possibly a delayed handshake retry message was delivered?"
)
return .abortIncomingHandshake(error)
}
}
guard let inProgress = self._handshakes[offer.originNode.node] else {
// no other concurrent handshakes in progress; good, this is happy path, so we simply continue our negotiation
return prepareNegotiation0()
}
switch inProgress {
case .initiated(let initiated):
/// Since we MAY have 2 connections open at this point in time -- one we opened, and another that was opened
/// to us when the other node tried to associated, we'll perform a tie-breaker to ensure we predictably
/// only use _one_ of them, and close the other.
// let selectedChannel: Channel
/// order on nodes is somewhat arbitrary, but that is fine, since we only need this for tiebreakers
let tieBreakWinner = initiated.localNode < offer.originNode
self.log.debug("""
Concurrently initiated handshakes from nodes [\(initiated.localNode)](local) and [\(offer.originNode)](remote) \
detected! Resolving race by address ordering; This node \(tieBreakWinner ? "WON (will negotiate and reply)" : "LOST (will await reply)") tie-break.
""", metadata: [
"handshake/inProgress": "\(initiated)",
"handshake/incoming/offer": "\(offer)",
"handshake/incoming/channel": "\(incomingChannel)",
])
if tieBreakWinner {
if self.closeOutboundHandshakeChannel(with: offer.originNode.node) != nil {
self.log.debug(
"Aborted handshake, as concurrently negotiating another one with same node already",
metadata: [
"handshake/status": "abort-incoming,offer",
"handshake/from": "\(offer.originNode)",
]
)
}
return prepareNegotiation0()
} else {
// we "lost", the other node will send the accept; when it does, the will complete the future.
// concurrent handshake and we should abort
let error = HandshakeStateMachine.HandshakeConnectionError(
node: offer.originNode.node,
message: """
Terminating this connection, as there is a concurrently established connection with same host [\(offer.originNode)] \
which will be used to complete the handshake.
"""
)
return .abortIncomingHandshake(error)
}
case .wasOfferedHandshake:
// suspicious but but not wrong, so we were offered before, and now are being offered again?
// Situations:
// - it could be that the remote re-sent their offer before it received our accept?
// - maybe remote did not receive our accept/reject and is trying again?
return prepareNegotiation0()
// --- these are never stored ----
case .inFlight(let inFlight):
fatalError("inFlight state [\(inFlight)] should never have been stored as handshake state; This is likely a bug, please open an issue.")
case .completed(let completed):
fatalError("completed state [\(completed)] should never have been stored as handshake state; This is likely a bug, please open an issue.")
}
}