in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala [230:275]
override def withStatusObserver(observer: StatusObserver[Envelope]): JdbcProjectionImpl[Offset, Envelope, S] =
copy(statusObserver = observer)
private[projection] def actorHandlerInit[T]: Option[ActorHandlerInit[T]] =
handlerStrategy.actorHandlerInit
/**
* INTERNAL API
* Return a RunningProjection
*/
override private[projection] def run()(implicit system: ActorSystem[_]): RunningProjection =
new JdbcInternalProjectionState(settingsOrDefaults).newRunningInstance()
/**
* INTERNAL API
*
* This method returns the projection Source mapped with user 'handler' function, but before any sink attached.
* This is mainly intended to be used by the TestKit allowing it to attach a TestSink to it.
*/
override private[projection] def mappedSource()(implicit system: ActorSystem[_]): Source[Done, Future[Done]] =
new JdbcInternalProjectionState(settingsOrDefaults).mappedSource()
private class JdbcInternalProjectionState(settings: ProjectionSettings)(implicit val system: ActorSystem[_])
extends InternalProjectionState[Offset, Envelope](
projectionId,
sourceProvider,
offsetStrategy,
handlerStrategy,
statusObserver,
settings) {
implicit val executionContext: ExecutionContext = system.executionContext
override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass)
override def readPaused(): Future[Boolean] =
offsetStore.readManagementState(projectionId).map(_.exists(_.paused))
override def readOffsets(): Future[Option[Offset]] =
offsetStore.readOffset(projectionId)
override def saveOffset(projectionId: ProjectionId, offset: Offset): Future[Done] =
offsetStore.saveOffset(projectionId, offset)
private[projection] def newRunningInstance(): RunningProjection =
new JdbcRunningProjection(RunningProjection.withBackoff(() => mappedSource(), settings), this)
}