private[projection] def adaptedHandlerForFlow[Offset, Envelope]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [325:354]


  private[projection] def adaptedHandlerForFlow[Offset, Envelope](
      sourceProvider: SourceProvider[Offset, Envelope],
      handler: FlowWithContext[Envelope, ProjectionContext, Done, ProjectionContext, _],
      offsetStore: R2dbcOffsetStore)(implicit
      ec: ExecutionContext,
      system: ActorSystem[_]): FlowWithContext[Envelope, ProjectionContext, Done, ProjectionContext, _] = {

    FlowWithContext[Envelope, ProjectionContext]
      .mapAsync(1) { env =>
        offsetStore
          .isAccepted(env)
          .flatMap { ok =>
            if (ok) {
              if (skipEnvelope(env)) {
                log.info("atLeastOnceFlow doesn't support of skipping envelopes. Envelope [{}] still emitted.", env)
              }
              loadEnvelope(env, sourceProvider).map { loadedEnvelope =>
                offsetStore.addInflight(loadedEnvelope)
                Some(loadedEnvelope)
              }
            } else {
              Future.successful(None)
            }
          }
      }
      .collect { case Some(env) =>
        env
      }
      .via(handler)
  }