func channelRead()

in Sources/DistributedActors/Cluster/Transport/TransportPipelines.swift [125:167]


    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        do {
            var bytes = self.unwrapInboundIn(data)
            let offer = try self.readHandshakeOffer(bytes: &bytes)

            let metadata: Logger.Metadata = [
                "handshake/channel": "\(context.channel)",
            ]

            self.log.debug("Received handshake offer from: [\(reflecting: offer.originNode)] with protocol version: [\(offer.version)]", metadata: metadata)

            let promise = context.eventLoop.makePromise(of: Wire.HandshakeResponse.self)
            self.cluster.tell(.inbound(.handshakeOffer(offer, channel: context.channel, handshakeReplyTo: promise)))

            promise.futureResult.whenComplete { res in
                switch res {
                case .success(.accept(let accept)):
                    do {
                        self.log.debug("Write accept handshake to: [\(offer.originNode)]", metadata: metadata)
                        try self.writeHandshakeAccept(context, accept)
                    } catch {
                        self.log.error("Failed when sending Handshake.Accept: \(accept), error: \(error)")
                        context.fireErrorCaught(error)
                    }

                case .success(.reject(let reject)):
                    do {
                        self.log.debug("Write reject handshake offer to: [\(offer.originNode)] reason: [\(reject.reason)]", metadata: metadata)
                        try self.writeHandshakeReject(context, reject)
                    } catch {
                        self.log.error("Failed when writing \(reject), error: \(error)", metadata: metadata)
                        context.fireErrorCaught(error)
                    }

                case .failure(let err):
                    context.fireErrorCaught(err)
                    // _ = context.close() // TODO: maybe?
                }
            }
        } catch {
            context.fireErrorCaught(error)
        }
    }