private[projection] def adaptedHandlerForExactlyOnce[Offset, Envelope]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [160:192]


  private[projection] def adaptedHandlerForExactlyOnce[Offset, Envelope](
      sourceProvider: SourceProvider[Offset, Envelope],
      handlerFactory: () => R2dbcHandler[Envelope],
      offsetStore: R2dbcOffsetStore,
      r2dbcExecutor: R2dbcExecutor)(implicit ec: ExecutionContext, system: ActorSystem[_]): () => Handler[Envelope] = {
    () =>
      new AdaptedR2dbcHandler(handlerFactory()) {
        override def process(envelope: Envelope): Future[Done] = {
          offsetStore.isAccepted(envelope).flatMap {
            case true =>
              if (skipEnvelope(envelope)) {
                val offset = sourceProvider.extractOffset(envelope)
                offsetStore.saveOffset(offset)
              } else {
                loadEnvelope(envelope, sourceProvider).flatMap { loadedEnvelope =>
                  val offset = sourceProvider.extractOffset(loadedEnvelope)
                  r2dbcExecutor.withConnection("exactly-once handler") { conn =>
                    // run users handler
                    val session = new R2dbcSession(conn)
                    delegate
                      .process(session, loadedEnvelope)
                      .flatMap { _ =>
                        offsetStore.saveOffsetInTx(conn, offset)
                      }
                  }
                }
              }
            case false =>
              FutureDone
          }
        }
      }
  }