def atLeastOnce[Offset, Envelope]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala [104:138]


  def atLeastOnce[Offset, Envelope](
      projectionId: ProjectionId,
      settings: Option[R2dbcProjectionSettings],
      sourceProvider: SourceProvider[Offset, Envelope],
      handler: () => R2dbcHandler[Envelope])(implicit
      system: ActorSystem[_]): AtLeastOnceProjection[Offset, Envelope] = {

    val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system))
    val connFactory = connectionFactory(system, r2dbcSettings)
    val offsetStore =
      R2dbcProjectionImpl.createOffsetStore(
        projectionId,
        timestampOffsetBySlicesSourceProvider(sourceProvider),
        r2dbcSettings,
        connFactory)
    val r2dbcExecutor = new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding)(
      system.executionContext,
      system)

    val adaptedHandler =
      R2dbcProjectionImpl.adaptedHandlerForAtLeastOnce(sourceProvider, handler, offsetStore, r2dbcExecutor)(
        system.executionContext,
        system)

    new R2dbcProjectionImpl(
      projectionId,
      r2dbcSettings,
      settingsOpt = None,
      sourceProvider,
      restartBackoffOpt = None,
      offsetStrategy = AtLeastOnce(),
      handlerStrategy = SingleHandlerStrategy(adaptedHandler),
      NoopStatusObserver,
      offsetStore)
  }