in slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala [67:131]
def exactlyOnce[Offset, Envelope, P <: JdbcProfile: ClassTag](
projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
databaseConfig: DatabaseConfig[P],
handler: () => SlickHandler[Envelope])(
implicit system: ActorSystem[_]): ExactlyOnceProjection[Offset, Envelope] = {
val offsetStore = createOffsetStore(databaseConfig)
val adaptedSlickHandler: () => Handler[Envelope] = () =>
new Handler[Envelope] {
private implicit val ec: ExecutionContext = system.executionContext
private val logger = Logging(system.classicSystem, classOf[SlickProjectionImpl[_, _, _]])
private val delegate = handler()
import databaseConfig.profile.api._
override def process(envelope: Envelope): Future[Done] = {
val offset = sourceProvider.extractOffset(envelope)
val processedDBIO = offsetStore
.saveOffset(projectionId, offset)
.flatMap(_ => delegate.process(envelope))
val verifiedDBIO =
sourceProvider match {
case vsp: VerifiableSourceProvider[Offset, Envelope] =>
processedDBIO.flatMap { action =>
vsp.verifyOffset(offset) match {
case VerificationSuccess => slick.dbio.DBIO.successful(action)
case VerificationFailure(reason) =>
logger.warning(
"The offset failed source provider verification after the envelope was processed. " +
"The transaction will not be executed. Skipping envelope with reason: {}",
reason)
slick.dbio.DBIO.failed(VerificationFailureException)
}
}
case _ => processedDBIO
}
// run user function and offset storage on the same transaction
// any side-effect in user function is at-least-once
databaseConfig.db
.run(verifiedDBIO.transactionally)
.recover {
case VerificationFailureException => Done
}
.map(_ => Done)
}
override def start(): Future[Done] = delegate.start()
override def stop(): Future[Done] = delegate.stop()
}
new SlickProjectionImpl(
projectionId,
sourceProvider,
databaseConfig,
settingsOpt = None,
restartBackoffOpt = None,
ExactlyOnce(),
SingleHandlerStrategy(adaptedSlickHandler),
NoopStatusObserver,
offsetStore)
}