private[projection] def createOffsetStore[S S)()

in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala [60:84]


  private[projection] def createOffsetStore[S <: JdbcSession](sessionFactory: () => S)(
      implicit system: ActorSystem[_]) =
    new JdbcOffsetStore[S](system, JdbcSettings(system), sessionFactory)

  private[projection] def adaptedHandlerForExactlyOnce[Offset, Envelope, S <: JdbcSession](
      projectionId: ProjectionId,
      sourceProvider: SourceProvider[Offset, Envelope],
      sessionFactory: () => S,
      handlerFactory: () => JdbcHandler[Envelope, S],
      offsetStore: JdbcOffsetStore[S]): () => Handler[Envelope] = { () =>
    new AdaptedJdbcHandler(handlerFactory(), offsetStore.executionContext) {
      override def process(envelope: Envelope): Future[Done] = {
        val offset = sourceProvider.extractOffset(envelope)
        JdbcSessionUtil
          .withSession(sessionFactory) { sess =>
            sess.withConnection[Unit] { conn =>
              offsetStore.saveOffsetBlocking(conn, projectionId, offset)
            }
            // run users handler
            delegate.process(sess, envelope)
          }
          .map(_ => Done)
      }
    }
  }