in slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala [148:181]
def atLeastOnce[Offset, Envelope, P <: JdbcProfile: ClassTag](
projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
databaseConfig: DatabaseConfig[P],
handler: () => SlickHandler[Envelope])(
implicit system: ActorSystem[_]): AtLeastOnceProjection[Offset, Envelope] = {
import databaseConfig.profile.api._
val adaptedSlickHandler: () => Handler[Envelope] = () =>
new Handler[Envelope] {
private implicit val ec: ExecutionContext = system.executionContext
private val delegate = handler()
override def process(envelope: Envelope): Future[Done] = {
// user function in one transaction (may be composed of several DBIOAction)
val dbio = delegate.process(envelope).map(_ => Done).transactionally
databaseConfig.db.run(dbio)
}
override def start(): Future[Done] = delegate.start()
override def stop(): Future[Done] = delegate.stop()
}
new SlickProjectionImpl(
projectionId,
sourceProvider,
databaseConfig,
settingsOpt = None,
restartBackoffOpt = None,
AtLeastOnce(),
SingleHandlerStrategy(adaptedSlickHandler),
NoopStatusObserver,
createOffsetStore(databaseConfig))
}