in pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkManager.scala [43:122]
def apply(workTimeout: FiniteDuration): Behavior[Command] =
Behaviors.setup { ctx =>
implicit val timeout = Timeout(5.seconds)
val producerController =
ctx.spawn(WorkPullingProducerController[WorkerCommand]("work-manager", ManagerServiceKey, None),
"producer-controller")
val requestNextAdapter = ctx.messageAdapter(RequestNextWrapper)
producerController ! WorkPullingProducerController.Start(requestNextAdapter)
var requestNext: Option[RequestNext[WorkerCommand]] = None
def tryStartWork(workState: WorkState): Effect[WorkDomainEvent, WorkState] = {
if (workState.hasWork) {
requestNext match {
case Some(next) =>
val work = workState.nextWork
ctx.ask[MessageWithConfirmation[WorkerCommand], Done](next.askNextTo,
done => MessageWithConfirmation(DoWork(work), done)) {
case Success(Done) =>
WorkIsDone(work.workId)
case Failure(t) =>
ctx.log.error("Work failed", t)
WorkFailed(work.workId, t)
}
requestNext = None
Effect.persist(WorkStarted(work.workId))
case _ =>
Effect.none
}
} else {
Effect.none
}
}
EventSourcedBehavior[Command, WorkDomainEvent, WorkState](
persistenceId = PersistenceId.ofUniqueId("master"),
emptyState = WorkState.empty,
commandHandler = (workState, command) => {
command match {
case RequestNextWrapper(rn) =>
ctx.log.info("work request: {}")
if (requestNext.isDefined) {
throw new IllegalStateException(s"Request next when there is already demand ${rn}, ${requestNext}")
}
requestNext = Some(rn)
tryStartWork(workState)
case TryStartWork =>
tryStartWork(workState)
case ResetWorkInProgress =>
Effect.persist(WorkInProgressReset)
case WorkIsDone(workId) =>
Effect.persist[WorkDomainEvent, WorkState](WorkCompleted(workId)).thenRun { newState =>
ctx.log.info("Work is done {}. New state {}", workId, newState)
}
case WorkFailed(id, reason) =>
ctx.log.info("Work failed {} {}", id, reason)
tryStartWork(workState)
case work: SubmitWork =>
// idempotent
if (workState.isAccepted(work.work.workId)) {
work.replyTo ! WorkManager.Ack(work.work.workId)
Effect.none
} else {
ctx.log.info("Accepted work: {}", work.work.workId)
Effect.persist(WorkAccepted(work.work)).thenRun { _ =>
// Ack back to original sender
work.replyTo ! WorkManager.Ack(work.work.workId)
ctx.self ! TryStartWork
}
}
}
},
eventHandler = (workState, event) => workState.updated(event)).receiveSignal {
case (state, RecoveryCompleted) =>
// Any in progress work from the previous incarnation is retried
ctx.self ! ResetWorkInProgress
}
}