in slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala [226:290]
def groupedWithin[Offset, Envelope, P <: JdbcProfile: ClassTag](
projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
databaseConfig: DatabaseConfig[P],
handler: () => SlickHandler[immutable.Seq[Envelope]])(
implicit system: ActorSystem[_]): GroupedProjection[Offset, Envelope] = {
val offsetStore = createOffsetStore(databaseConfig)
val adaptedSlickHandler: () => Handler[immutable.Seq[Envelope]] = () =>
new Handler[immutable.Seq[Envelope]] {
import databaseConfig.profile.api._
private implicit val ec: ExecutionContext = system.executionContext
private val logger = Logging(system.classicSystem, classOf[SlickProjectionImpl[_, _, _]])
private val delegate = handler()
override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
val lastOffset = sourceProvider.extractOffset(envelopes.last)
val processedDBIO = offsetStore
.saveOffset(projectionId, lastOffset)
.flatMap(_ => delegate.process(envelopes))
val verifiedDBIO =
sourceProvider match {
case vsp: VerifiableSourceProvider[Offset, Envelope] =>
processedDBIO.flatMap { action =>
vsp.verifyOffset(lastOffset) match {
case VerificationSuccess => slick.dbio.DBIO.successful(action)
case VerificationFailure(reason) =>
logger.warning(
"The offset failed source provider verification after the envelope was processed. " +
"The transaction will not be executed. Skipping envelope with reason: {}",
reason)
slick.dbio.DBIO.failed(VerificationFailureException)
}
}
case _ => processedDBIO
}
// run user function and offset storage on the same transaction
// any side-effect in user function is at-least-once
databaseConfig.db
.run(verifiedDBIO.transactionally)
.recover {
case VerificationFailureException => Done
}
.map(_ => Done)
}
override def start(): Future[Done] = delegate.start()
override def stop(): Future[Done] = delegate.stop()
}
new SlickProjectionImpl(
projectionId,
sourceProvider,
databaseConfig,
settingsOpt = None,
restartBackoffOpt = None,
ExactlyOnce(),
GroupedHandlerStrategy(adaptedSlickHandler),
NoopStatusObserver,
offsetStore)
}