def groupedWithin[Offset, Envelope, P <: JdbcProfile: ClassTag]()

in slick/src/main/scala/org/apache/pekko/projection/slick/SlickProjection.scala [226:290]


  def groupedWithin[Offset, Envelope, P <: JdbcProfile: ClassTag](
      projectionId: ProjectionId,
      sourceProvider: SourceProvider[Offset, Envelope],
      databaseConfig: DatabaseConfig[P],
      handler: () => SlickHandler[immutable.Seq[Envelope]])(
      implicit system: ActorSystem[_]): GroupedProjection[Offset, Envelope] = {

    val offsetStore = createOffsetStore(databaseConfig)

    val adaptedSlickHandler: () => Handler[immutable.Seq[Envelope]] = () =>
      new Handler[immutable.Seq[Envelope]] {

        import databaseConfig.profile.api._
        private implicit val ec: ExecutionContext = system.executionContext
        private val logger = Logging(system.classicSystem, classOf[SlickProjectionImpl[_, _, _]])
        private val delegate = handler()

        override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {

          val lastOffset = sourceProvider.extractOffset(envelopes.last)
          val processedDBIO = offsetStore
            .saveOffset(projectionId, lastOffset)
            .flatMap(_ => delegate.process(envelopes))
          val verifiedDBIO =
            sourceProvider match {
              case vsp: VerifiableSourceProvider[Offset, Envelope] =>
                processedDBIO.flatMap { action =>
                  vsp.verifyOffset(lastOffset) match {
                    case VerificationSuccess => slick.dbio.DBIO.successful(action)
                    case VerificationFailure(reason) =>
                      logger.warning(
                        "The offset failed source provider verification after the envelope was processed. " +
                        "The transaction will not be executed. Skipping envelope with reason: {}",
                        reason)
                      slick.dbio.DBIO.failed(VerificationFailureException)
                  }
                }
              case _ => processedDBIO
            }

          // run user function and offset storage on the same transaction
          // any side-effect in user function is at-least-once
          databaseConfig.db
            .run(verifiedDBIO.transactionally)
            .recover {
              case VerificationFailureException => Done
            }
            .map(_ => Done)
        }

        override def start(): Future[Done] = delegate.start()
        override def stop(): Future[Done] = delegate.stop()
      }

    new SlickProjectionImpl(
      projectionId,
      sourceProvider,
      databaseConfig,
      settingsOpt = None,
      restartBackoffOpt = None,
      ExactlyOnce(),
      GroupedHandlerStrategy(adaptedSlickHandler),
      NoopStatusObserver,
      offsetStore)
  }