func forwarding()

in Sources/DistributedActors/Pattern/WorkerPool.swift [137:197]


    func forwarding(to workers: [_ActorRef<Message>]) -> PoolBehavior {
        .setup { context in
            // TODO: would be some actual logic, that we can plug and play
            var _roundRobinPos = 0

            func selectWorker() -> _ActorRef<Message> {
                let worker = workers[_roundRobinPos]
                _roundRobinPos = (_roundRobinPos + 1) % workers.count
                return worker
            }

            let _forwarding: _Behavior<WorkerPoolMessage<Message>> = .receive { context, poolMessage in
                switch poolMessage {
                case .forward(let message):
                    let selected = selectWorker()
                    context.log.log(level: self.settings.logLevel, "Forwarding [\(message)] to [\(selected)]")
                    selected.tell(message)
                    return .same

                case .listing(let listing):
                    guard !listing.refs.isEmpty else {
                        context.log.log(level: self.settings.logLevel, "Worker pool downsized to zero members, becoming `awaitingWorkers`.")
                        return self.awaitingWorkers()
                    }

                    var newWorkers = Array(listing.refs) // TODO: smarter logic here, remove dead ones etc; keep stable round robin while new listing arrives
                    newWorkers.sort { l, r in l.address.description < r.address.description }

                    context.log.log(level: self.settings.logLevel, "Active workers: \(newWorkers.count)")
                    // TODO: if no more workers may want to issue warnings or timeouts
                    return self.forwarding(to: newWorkers)
                }
            }

            // While we would remove terminated workers thanks to a new Listing arriving in any case,
            // the listing can arrive much later than a direct Terminated message - allowing for a longer
            // time window in which we are under risk of forwarding work to an already dead actor.
            //
            // In order to make this time window smaller, we explicitly watch and remove any workers we are forwarding to.
            workers.forEach { context.watch($0) }

            let eagerlyRemoteTerminatedWorkers: _Behavior<WorkerPoolMessage<Message>> =
                .receiveSpecificSignal(Signals.Terminated.self) { _, terminated in
                    var remainingWorkers = workers
                    remainingWorkers.removeAll { ref in ref.address == terminated.address } // TODO: removeFirst is enough, but has no closure version

                    if remainingWorkers.count > 0 {
                        return self.forwarding(to: remainingWorkers)
                    } else {
                        switch self.settings.whenAllWorkersTerminated {
                        case .awaitNewWorkers:
                            return self.awaitingWorkers()
                        case .crash(let error):
                            throw error
                        }
                    }
                }

            return _forwarding.orElse(eagerlyRemoteTerminatedWorkers)
        }
    }