private[projection] def adaptedHandlerForGroupedAsync[Offset, Envelope]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [292:323]


  private[projection] def adaptedHandlerForGroupedAsync[Offset, Envelope](
      sourceProvider: SourceProvider[Offset, Envelope],
      handlerFactory: () => Handler[immutable.Seq[Envelope]],
      offsetStore: R2dbcOffsetStore)(implicit
      ec: ExecutionContext,
      system: ActorSystem[_]): () => Handler[immutable.Seq[Envelope]] = { () =>
    new AdaptedHandler(handlerFactory()) {
      override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
        offsetStore.filterAccepted(envelopes).flatMap { acceptedEnvelopes =>
          if (acceptedEnvelopes.isEmpty) {
            FutureDone
          } else {
            Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap {
              loadedEnvelopes =>
                val filteredEnvelopes = loadedEnvelopes.filterNot(skipEnvelope)
                if (filteredEnvelopes.isEmpty) {
                  offsetStore.addInflights(loadedEnvelopes)
                  FutureDone
                } else {
                  delegate
                    .process(filteredEnvelopes)
                    .map { _ =>
                      offsetStore.addInflights(loadedEnvelopes)
                      Done
                    }
                }
            }
          }
        }
      }
    }
  }