in Sources/DistributedActors/Pattern/WorkerPool.swift [137:197]
func forwarding(to workers: [_ActorRef<Message>]) -> PoolBehavior {
.setup { context in
// TODO: would be some actual logic, that we can plug and play
var _roundRobinPos = 0
func selectWorker() -> _ActorRef<Message> {
let worker = workers[_roundRobinPos]
_roundRobinPos = (_roundRobinPos + 1) % workers.count
return worker
}
let _forwarding: _Behavior<WorkerPoolMessage<Message>> = .receive { context, poolMessage in
switch poolMessage {
case .forward(let message):
let selected = selectWorker()
context.log.log(level: self.settings.logLevel, "Forwarding [\(message)] to [\(selected)]")
selected.tell(message)
return .same
case .listing(let listing):
guard !listing.refs.isEmpty else {
context.log.log(level: self.settings.logLevel, "Worker pool downsized to zero members, becoming `awaitingWorkers`.")
return self.awaitingWorkers()
}
var newWorkers = Array(listing.refs) // TODO: smarter logic here, remove dead ones etc; keep stable round robin while new listing arrives
newWorkers.sort { l, r in l.address.description < r.address.description }
context.log.log(level: self.settings.logLevel, "Active workers: \(newWorkers.count)")
// TODO: if no more workers may want to issue warnings or timeouts
return self.forwarding(to: newWorkers)
}
}
// While we would remove terminated workers thanks to a new Listing arriving in any case,
// the listing can arrive much later than a direct Terminated message - allowing for a longer
// time window in which we are under risk of forwarding work to an already dead actor.
//
// In order to make this time window smaller, we explicitly watch and remove any workers we are forwarding to.
workers.forEach { context.watch($0) }
let eagerlyRemoteTerminatedWorkers: _Behavior<WorkerPoolMessage<Message>> =
.receiveSpecificSignal(Signals.Terminated.self) { _, terminated in
var remainingWorkers = workers
remainingWorkers.removeAll { ref in ref.address == terminated.address } // TODO: removeFirst is enough, but has no closure version
if remainingWorkers.count > 0 {
return self.forwarding(to: remainingWorkers)
} else {
switch self.settings.whenAllWorkersTerminated {
case .awaitNewWorkers:
return self.awaitingWorkers()
case .crash(let error):
throw error
}
}
}
return _forwarding.orElse(eagerlyRemoteTerminatedWorkers)
}
}