in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [160:192]
private[projection] def adaptedHandlerForExactlyOnce[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
handlerFactory: () => R2dbcHandler[Envelope],
offsetStore: R2dbcOffsetStore,
r2dbcExecutor: R2dbcExecutor)(implicit ec: ExecutionContext, system: ActorSystem[_]): () => Handler[Envelope] = {
() =>
new AdaptedR2dbcHandler(handlerFactory()) {
override def process(envelope: Envelope): Future[Done] = {
offsetStore.isAccepted(envelope).flatMap {
case true =>
if (skipEnvelope(envelope)) {
val offset = sourceProvider.extractOffset(envelope)
offsetStore.saveOffset(offset)
} else {
loadEnvelope(envelope, sourceProvider).flatMap { loadedEnvelope =>
val offset = sourceProvider.extractOffset(loadedEnvelope)
r2dbcExecutor.withConnection("exactly-once handler") { conn =>
// run users handler
val session = new R2dbcSession(conn)
delegate
.process(session, loadedEnvelope)
.flatMap { _ =>
offsetStore.saveOffsetInTx(conn, offset)
}
}
}
}
case false =>
FutureDone
}
}
}
}