in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [292:323]
private[projection] def adaptedHandlerForGroupedAsync[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
handlerFactory: () => Handler[immutable.Seq[Envelope]],
offsetStore: R2dbcOffsetStore)(implicit
ec: ExecutionContext,
system: ActorSystem[_]): () => Handler[immutable.Seq[Envelope]] = { () =>
new AdaptedHandler(handlerFactory()) {
override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
offsetStore.filterAccepted(envelopes).flatMap { acceptedEnvelopes =>
if (acceptedEnvelopes.isEmpty) {
FutureDone
} else {
Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap {
loadedEnvelopes =>
val filteredEnvelopes = loadedEnvelopes.filterNot(skipEnvelope)
if (filteredEnvelopes.isEmpty) {
offsetStore.addInflights(loadedEnvelopes)
FutureDone
} else {
delegate
.process(filteredEnvelopes)
.map { _ =>
offsetStore.addInflights(loadedEnvelopes)
Done
}
}
}
}
}
}
}
}