in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [264:291]
private[projection] def adaptedHandlerForAtLeastOnceAsync[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
handlerFactory: () => Handler[Envelope],
offsetStore: R2dbcOffsetStore)(implicit ec: ExecutionContext, system: ActorSystem[_]): () => Handler[Envelope] = {
() =>
new AdaptedHandler(handlerFactory()) {
override def process(envelope: Envelope): Future[Done] = {
offsetStore.isAccepted(envelope).flatMap {
case true =>
if (skipEnvelope(envelope)) {
offsetStore.addInflight(envelope)
FutureDone
} else {
loadEnvelope(envelope, sourceProvider).flatMap { loadedEnvelope =>
delegate
.process(loadedEnvelope)
.map { _ =>
offsetStore.addInflight(loadedEnvelope)
Done
}
}
}
case false =>
FutureDone
}
}
}
}