in core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala [249:279]
private def settingPaused(setPaused: SetPaused, mgmt: RunningProjectionManagement[_]): Behavior[Command] =
Behaviors.receiveMessage {
case Stopped =>
context.log.debug("Projection [{}] stopped", projectionId)
context.pipeToSelf(mgmt.setPaused(setPaused.paused)) {
case Success(_) => SetPausedResult(setPaused.replyTo)
case Failure(exc) => ManagementOperationException(setPaused, exc)
}
Behaviors.same
case SetPausedResult(replyTo) =>
context.log.info2(
"Starting projection [{}] in {} mode.",
projection.projectionId,
if (setPaused.paused) "paused" else "resumed")
val running = projection.run()(context.system)
replyTo ! Done
stashBuffer.unstashAll(started(running))
case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed.", op, exc)
// start anyway, but no reply
val running = projection.run()(context.system)
stashBuffer.unstashAll(started(running))
case other =>
stashBuffer.stash(other)
Behaviors.same
}