in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [194:228]
private[projection] def adaptedHandlerForGrouped[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
handlerFactory: () => R2dbcHandler[immutable.Seq[Envelope]],
offsetStore: R2dbcOffsetStore,
r2dbcExecutor: R2dbcExecutor)(
implicit
ec: ExecutionContext,
system: ActorSystem[_]): () => Handler[immutable.Seq[Envelope]] = { () =>
new AdaptedR2dbcHandler(handlerFactory()) {
override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
offsetStore.filterAccepted(envelopes).flatMap { acceptedEnvelopes =>
if (acceptedEnvelopes.isEmpty) {
FutureDone
} else {
Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap {
loadedEnvelopes =>
val offsets = loadedEnvelopes.iterator.map(sourceProvider.extractOffset).toVector
val filteredEnvelopes = loadedEnvelopes.filterNot(skipEnvelope)
if (filteredEnvelopes.isEmpty) {
offsetStore.saveOffsets(offsets)
} else {
r2dbcExecutor.withConnection("grouped handler") { conn =>
// run users handler
val session = new R2dbcSession(conn)
delegate.process(session, filteredEnvelopes).flatMap { _ =>
offsetStore.saveOffsetsInTx(conn, offsets)
}
}
}
}
}
}
}
}
}