in core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala [199:231]
private def settingOffset(
setOffset: SetOffset[Offset],
mgmt: RunningProjectionManagement[Offset]): Behavior[Command] =
Behaviors.receiveMessage {
case Stopped =>
context.log.debug("Projection [{}] stopped", projectionId)
context.pipeToSelf(mgmt.setOffset(setOffset.offset)) {
case Success(_) => SetOffsetResult(setOffset.replyTo)
case Failure(exc) => ManagementOperationException(setOffset, exc)
}
Behaviors.same
case SetOffsetResult(replyTo) =>
context.log.info2(
"Starting projection [{}] after setting offset to [{}]",
projection.projectionId,
setOffset.offset)
val running = projection.run()(context.system)
replyTo ! Done
stashBuffer.unstashAll(started(running))
case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed.", op, exc)
// start anyway, but no reply
val running = projection.run()(context.system)
stashBuffer.unstashAll(started(running))
case other =>
stashBuffer.stash(other)
Behaviors.same
}