in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [325:354]
private[projection] def adaptedHandlerForFlow[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
handler: FlowWithContext[Envelope, ProjectionContext, Done, ProjectionContext, _],
offsetStore: R2dbcOffsetStore)(implicit
ec: ExecutionContext,
system: ActorSystem[_]): FlowWithContext[Envelope, ProjectionContext, Done, ProjectionContext, _] = {
FlowWithContext[Envelope, ProjectionContext]
.mapAsync(1) { env =>
offsetStore
.isAccepted(env)
.flatMap { ok =>
if (ok) {
if (skipEnvelope(env)) {
log.info("atLeastOnceFlow doesn't support of skipping envelopes. Envelope [{}] still emitted.", env)
}
loadEnvelope(env, sourceProvider).map { loadedEnvelope =>
offsetStore.addInflight(loadedEnvelope)
Some(loadedEnvelope)
}
} else {
Future.successful(None)
}
}
}
.collect { case Some(env) =>
env
}
.via(handler)
}