in projection/src/main/scala/org/apache/pekko/projection/r2dbc/scaladsl/R2dbcProjection.scala [104:138]
def atLeastOnce[Offset, Envelope](
projectionId: ProjectionId,
settings: Option[R2dbcProjectionSettings],
sourceProvider: SourceProvider[Offset, Envelope],
handler: () => R2dbcHandler[Envelope])(implicit
system: ActorSystem[_]): AtLeastOnceProjection[Offset, Envelope] = {
val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system))
val connFactory = connectionFactory(system, r2dbcSettings)
val offsetStore =
R2dbcProjectionImpl.createOffsetStore(
projectionId,
timestampOffsetBySlicesSourceProvider(sourceProvider),
r2dbcSettings,
connFactory)
val r2dbcExecutor = new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding)(
system.executionContext,
system)
val adaptedHandler =
R2dbcProjectionImpl.adaptedHandlerForAtLeastOnce(sourceProvider, handler, offsetStore, r2dbcExecutor)(
system.executionContext,
system)
new R2dbcProjectionImpl(
projectionId,
r2dbcSettings,
settingsOpt = None,
sourceProvider,
restartBackoffOpt = None,
offsetStrategy = AtLeastOnce(),
handlerStrategy = SingleHandlerStrategy(adaptedHandler),
NoopStatusObserver,
offsetStore)
}