in pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Frontend.scala [38:68]
private def running(
ctx: ActorContext[Event], workers: IndexedSeq[ActorRef[Worker.TransformText]], jobCounter: Int): Behavior[Event] =
Behaviors.receiveMessage {
case WorkersUpdated(newWorkers) =>
ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
running(ctx, newWorkers.toIndexedSeq, jobCounter)
case Tick =>
if (workers.isEmpty) {
ctx.log.warn("Got tick request but no workers available, not sending any work")
Behaviors.same
} else {
// how much time can pass before we consider a request failed
implicit val timeout: Timeout = 5.seconds
val selectedWorker = workers(jobCounter % workers.size)
ctx.log.info("Sending work for processing to {}", selectedWorker)
val text = s"hello-$jobCounter"
ctx.ask(selectedWorker, Worker.TransformText(text, _)) {
case Success(transformedText) => TransformCompleted(transformedText.text, text)
case Failure(ex) => JobFailed("Processing timed out", text)
}
running(ctx, workers, jobCounter + 1)
}
case TransformCompleted(originalText, transformedText) =>
ctx.log.info("Got completed transform of {}: {}", originalText, transformedText)
Behaviors.same
case JobFailed(why, text) =>
ctx.log.warn("Transformation of text {} failed. Because: {}", text, why)
Behaviors.same
}