private[projection] def adaptedHandlerForAtLeastOnceAsync[Offset, Envelope]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [264:291]


  private[projection] def adaptedHandlerForAtLeastOnceAsync[Offset, Envelope](
      sourceProvider: SourceProvider[Offset, Envelope],
      handlerFactory: () => Handler[Envelope],
      offsetStore: R2dbcOffsetStore)(implicit ec: ExecutionContext, system: ActorSystem[_]): () => Handler[Envelope] = {
    () =>
      new AdaptedHandler(handlerFactory()) {
        override def process(envelope: Envelope): Future[Done] = {
          offsetStore.isAccepted(envelope).flatMap {
            case true =>
              if (skipEnvelope(envelope)) {
                offsetStore.addInflight(envelope)
                FutureDone
              } else {
                loadEnvelope(envelope, sourceProvider).flatMap { loadedEnvelope =>
                  delegate
                    .process(loadedEnvelope)
                    .map { _ =>
                      offsetStore.addInflight(loadedEnvelope)
                      Done
                    }
                }
              }
            case false =>
              FutureDone
          }
        }
      }
  }