in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala [60:84]
private[projection] def createOffsetStore[S <: JdbcSession](sessionFactory: () => S)(
implicit system: ActorSystem[_]) =
new JdbcOffsetStore[S](system, JdbcSettings(system), sessionFactory)
private[projection] def adaptedHandlerForExactlyOnce[Offset, Envelope, S <: JdbcSession](
projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
sessionFactory: () => S,
handlerFactory: () => JdbcHandler[Envelope, S],
offsetStore: JdbcOffsetStore[S]): () => Handler[Envelope] = { () =>
new AdaptedJdbcHandler(handlerFactory(), offsetStore.executionContext) {
override def process(envelope: Envelope): Future[Done] = {
val offset = sourceProvider.extractOffset(envelope)
JdbcSessionUtil
.withSession(sessionFactory) { sess =>
sess.withConnection[Unit] { conn =>
offsetStore.saveOffsetBlocking(conn, projectionId, offset)
}
// run users handler
delegate.process(sess, envelope)
}
.map(_ => Done)
}
}
}