private[projection] def adaptedHandlerForGrouped[Offset, Envelope]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [194:228]


  private[projection] def adaptedHandlerForGrouped[Offset, Envelope](
      sourceProvider: SourceProvider[Offset, Envelope],
      handlerFactory: () => R2dbcHandler[immutable.Seq[Envelope]],
      offsetStore: R2dbcOffsetStore,
      r2dbcExecutor: R2dbcExecutor)(
      implicit
      ec: ExecutionContext,
      system: ActorSystem[_]): () => Handler[immutable.Seq[Envelope]] = { () =>
    new AdaptedR2dbcHandler(handlerFactory()) {
      override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
        offsetStore.filterAccepted(envelopes).flatMap { acceptedEnvelopes =>
          if (acceptedEnvelopes.isEmpty) {
            FutureDone
          } else {
            Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap {
              loadedEnvelopes =>
                val offsets = loadedEnvelopes.iterator.map(sourceProvider.extractOffset).toVector
                val filteredEnvelopes = loadedEnvelopes.filterNot(skipEnvelope)
                if (filteredEnvelopes.isEmpty) {
                  offsetStore.saveOffsets(offsets)
                } else {
                  r2dbcExecutor.withConnection("grouped handler") { conn =>
                    // run users handler
                    val session = new R2dbcSession(conn)
                    delegate.process(session, filteredEnvelopes).flatMap { _ =>
                      offsetStore.saveOffsetsInTx(conn, offsets)
                    }
                  }
                }
            }
          }
        }
      }
    }
  }