in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [230:262]
private[projection] def adaptedHandlerForAtLeastOnce[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
handlerFactory: () => R2dbcHandler[Envelope],
offsetStore: R2dbcOffsetStore,
r2dbcExecutor: R2dbcExecutor)(implicit ec: ExecutionContext, system: ActorSystem[_]): () => Handler[Envelope] = {
() =>
new AdaptedR2dbcHandler(handlerFactory()) {
override def process(envelope: Envelope): Future[Done] = {
offsetStore.isAccepted(envelope).flatMap {
case true =>
if (skipEnvelope(envelope)) {
offsetStore.addInflight(envelope)
FutureDone
} else {
loadEnvelope(envelope, sourceProvider).flatMap { loadedEnvelope =>
r2dbcExecutor
.withConnection("at-least-once handler") { conn =>
// run users handler
val session = new R2dbcSession(conn)
delegate.process(session, loadedEnvelope)
}
.map { _ =>
offsetStore.addInflight(loadedEnvelope)
Done
}
}
}
case false =>
FutureDone
}
}
}
}