in core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala [118:198]
private def started(running: RunningProjection): Behavior[Command] =
Behaviors
.receiveMessagePartial[Command] {
case Stop =>
context.log.debug("Projection [{}] is being stopped", projectionId)
val stoppedFut = running.stop()
// we send a Stopped for whatever completes the Future
// Success or Failure, doesn't matter, since the internal stream is by then stopped
context.pipeToSelf(stoppedFut)(_ => Stopped)
stopping()
case getOffset: GetOffset[Offset @unchecked] =>
running match {
case mgmt: RunningProjectionManagement[Offset @unchecked] =>
if (getOffset.projectionId == projectionId) {
context.pipeToSelf(mgmt.getOffset()) {
case Success(offset) => GetOffsetResult(offset, getOffset.replyTo)
case Failure(exc) => ManagementOperationException(getOffset, exc)
}
}
Behaviors.same
case _ => Behaviors.unhandled
}
case result: GetOffsetResult[Offset @unchecked] =>
receiveGetOffsetResult(result)
case setOffset: SetOffset[Offset @unchecked] =>
running match {
case mgmt: RunningProjectionManagement[Offset @unchecked] =>
if (setOffset.projectionId == projectionId) {
context.log.info2(
"Offset will be changed to [{}] for projection [{}]. The Projection will be restarted.",
setOffset.offset,
projectionId)
context.pipeToSelf(running.stop())(_ => Stopped)
settingOffset(setOffset, mgmt)
} else {
Behaviors.same // not for this projectionId
}
case _ => Behaviors.unhandled
}
case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed with: {}", op, exc)
Behaviors.same
case isPaused: IsPaused =>
running match {
case mgmt: RunningProjectionManagement[_] =>
if (isPaused.projectionId == projectionId) {
context.pipeToSelf(mgmt.getManagementState()) {
case Success(state) => GetManagementStateResult(state, isPaused.replyTo)
case Failure(exc) => ManagementOperationException(isPaused, exc)
}
}
Behaviors.same
case _ => Behaviors.unhandled
}
case GetManagementStateResult(state, replyTo) =>
replyTo ! state.exists(_.paused)
Behaviors.same
case setPaused: SetPaused =>
running match {
case mgmt: RunningProjectionManagement[_] =>
if (setPaused.projectionId == projectionId) {
context.log.info2(
"Running state will be changed to [{}] for projection [{}].",
if (setPaused.paused) "paused" else "resumed",
projectionId)
context.pipeToSelf(running.stop())(_ => Stopped)
settingPaused(setPaused, mgmt)
} else {
Behaviors.same // not for this projectionId
}
case _ => Behaviors.unhandled
}
}