in Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift [125:167]
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
do {
var bytes = self.unwrapInboundIn(data)
let offer = try self.readHandshakeOffer(bytes: &bytes)
let metadata: Logger.Metadata = [
"handshake/channel": "\(context.channel)",
]
self.log.debug("Received handshake offer from: [\(reflecting: offer.originNode)] with protocol version: [\(offer.version)]", metadata: metadata)
let promise = context.eventLoop.makePromise(of: Wire.HandshakeResponse.self)
self.cluster.tell(.inbound(.handshakeOffer(offer, channel: context.channel, handshakeReplyTo: promise)))
promise.futureResult.whenComplete { res in
switch res {
case .success(.accept(let accept)):
do {
self.log.debug("Write accept handshake to: [\(offer.originNode)]", metadata: metadata)
try self.writeHandshakeAccept(context, accept)
} catch {
self.log.error("Failed when sending Handshake.Accept: \(accept), error: \(error)")
context.fireErrorCaught(error)
}
case .success(.reject(let reject)):
do {
self.log.debug("Write reject handshake offer to: [\(offer.originNode)] reason: [\(reject.reason)]", metadata: metadata)
try self.writeHandshakeReject(context, reject)
} catch {
self.log.error("Failed when writing \(reject), error: \(error)", metadata: metadata)
context.fireErrorCaught(error)
}
case .failure(let err):
context.fireErrorCaught(err)
// _ = context.close() // TODO: maybe?
}
}
} catch {
context.fireErrorCaught(error)
}
}