in core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala [469:492]
private def stopHandlerOnTermination(
src: Source[Done, NotUsed],
handlerLifecycle: HandlerLifecycle): Source[Done, Future[Done]] = {
src
.watchTermination() { (_, futDone) =>
handlerStrategy.recreateHandlerOnNextAccess()
futDone
.andThen { case _ => handlerLifecycle.tryStop() }
.andThen {
case Success(_) =>
telemetry.stopped()
statusObserver.stopped(projectionId)
case Failure(AbortProjectionException) =>
telemetry.stopped()
statusObserver.stopped(projectionId) // no restart
case Failure(exc) =>
// For observer and telemetry, invoke failed first and stopped second.
Try(telemetry.failed(exc))
Try(telemetry.stopped())
Try(statusObserver.failed(projectionId, exc))
Try(statusObserver.stopped(projectionId))
}
}
}