in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/scaladsl/JdbcProjection.scala [51:78]
def exactlyOnce[Offset, Envelope, S <: JdbcSession](
projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
sessionFactory: () => S,
handler: () => JdbcHandler[Envelope, S])(
implicit system: ActorSystem[_]): ExactlyOnceProjection[Offset, Envelope] = {
val offsetStore = JdbcProjectionImpl.createOffsetStore(sessionFactory)
val adaptedHandler =
JdbcProjectionImpl.adaptedHandlerForExactlyOnce(
projectionId,
sourceProvider,
sessionFactory,
handler,
offsetStore)
new JdbcProjectionImpl(
projectionId,
sourceProvider,
sessionFactory,
settingsOpt = None,
restartBackoffOpt = None,
offsetStrategy = ExactlyOnce(),
handlerStrategy = SingleHandlerStrategy(adaptedHandler),
NoopStatusObserver,
offsetStore)
}