in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [475:575]
override def withStatusObserver(observer: StatusObserver[Envelope]): R2dbcProjectionImpl[Offset, Envelope] =
copy(statusObserver = observer)
private[projection] def actorHandlerInit[T]: Option[ActorHandlerInit[T]] =
handlerStrategy.actorHandlerInit
/**
* INTERNAL API Return a RunningProjection
*/
override private[projection] def run()(implicit system: ActorSystem[_]): RunningProjection =
new R2dbcInternalProjectionState(settingsOrDefaults).newRunningInstance()
/**
* INTERNAL API
*
* This method returns the projection Source mapped with user 'handler' function, but before any sink attached. This
* is mainly intended to be used by the TestKit allowing it to attach a TestSink to it.
*/
override private[projection] def mappedSource()(implicit system: ActorSystem[_]): Source[Done, Future[Done]] =
new R2dbcInternalProjectionState(settingsOrDefaults).mappedSource()
private class R2dbcInternalProjectionState(settings: ProjectionSettings)(implicit val system: ActorSystem[_])
extends InternalProjectionState[Offset, Envelope](
projectionId,
sourceProvider,
offsetStrategy,
handlerStrategy,
statusObserver,
settings) {
implicit val executionContext: ExecutionContext = system.executionContext
override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass)
private val isExactlyOnceWithSkip: Boolean =
offsetStrategy match {
case ExactlyOnce(Some(Skip)) | ExactlyOnce(Some(_: RetryAndSkip)) => true
case _ => false
}
override def readPaused(): Future[Boolean] =
offsetStore.readManagementState().map(_.exists(_.paused))
override def readOffsets(): Future[Option[Offset]] =
offsetStore.readOffset()
// Called from InternalProjectionState.saveOffsetAndReport
override def saveOffset(projectionId: ProjectionId, offset: Offset): Future[Done] =
offsetStore.saveOffset(offset)
override protected def saveOffsetAndReport(
projectionId: ProjectionId,
projectionContext: ProjectionContextImpl[Offset, Envelope],
batchSize: Int): Future[Done] = {
import R2dbcProjectionImpl.FutureDone
val envelope = projectionContext.envelope
if (offsetStore.isInflight(envelope) || isExactlyOnceWithSkip) {
super.saveOffsetAndReport(projectionId, projectionContext, batchSize)
} else {
FutureDone
}
}
override protected def saveOffsetsAndReport(
projectionId: ProjectionId,
batch: immutable.Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = {
import R2dbcProjectionImpl.FutureDone
val acceptedContexts =
if (isExactlyOnceWithSkip)
batch.toVector
else {
batch.iterator.filter { ctx =>
val env = ctx.envelope
offsetStore.isInflight(env)
}.toVector
}
if (acceptedContexts.isEmpty) {
FutureDone
} else {
val offsets = acceptedContexts.map(_.offset)
offsetStore
.saveOffsets(offsets)
.map { done =>
val batchSize = acceptedContexts.map { _.groupSize }.sum
val last = acceptedContexts.last
try {
statusObserver.offsetProgress(projectionId, last.envelope)
} catch {
case NonFatal(_) => // ignore
}
getTelemetry().onOffsetStored(batchSize)
done
}
}
}
private[projection] def newRunningInstance(): RunningProjection =
new R2dbcRunningProjection(RunningProjection.withBackoff(() => mappedSource(), settings), this)
}