def loadEnvelope[Envelope]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [95:158]


  def loadEnvelope[Envelope](env: Envelope, sourceProvider: SourceProvider[_, Envelope])(implicit
      ec: ExecutionContext): Future[Envelope] = {
    env match {
      case eventEnvelope: EventEnvelope[_] if eventEnvelope.eventOption.isEmpty && !skipEnvelope(eventEnvelope) =>
        val pid = eventEnvelope.persistenceId
        val seqNr = eventEnvelope.sequenceNr
        (sourceProvider match {
          case loadEventQuery: LoadEventQuery =>
            loadEventQuery.loadEnvelope[Any](pid, seqNr)
          case loadEventQuery: pekko.persistence.query.typed.javadsl.LoadEventQuery =>
            import pekko.util.FutureConverters._
            loadEventQuery.loadEnvelope[Any](pid, seqNr).asScala
          case _ =>
            throw new IllegalArgumentException(
              s"Expected sourceProvider [${sourceProvider.getClass.getName}] " +
              "to implement LoadEventQuery when used with eventsBySlices.")
        }).map { loadedEnv =>
          val count = loadEnvelopeCounter.incrementAndGet()
          if (count % 1000 == 0)
            log.info("Loaded event lazily, persistenceId [{}], seqNr [{}]. Load count [{}]", pid, seqNr: java.lang.Long,
              count: java.lang.Long)
          else
            log.debug("Loaded event lazily, persistenceId [{}], seqNr [{}]. Load count [{}]", pid,
              seqNr: java.lang.Long,
              count: java.lang.Long)
          loadedEnv.asInstanceOf[Envelope]
        }

      case upd: UpdatedDurableState[_] if upd.value == null =>
        val pid = upd.persistenceId
        val revision = upd.revision
        (sourceProvider match {
          case store: DurableStateStore[_] =>
            store.getObject(pid)
          case store: pekko.persistence.state.javadsl.DurableStateStore[_] =>
            import pekko.util.FutureConverters._
            store.getObject(pid).asScala.map(_.toScala)
        }).map {
          case GetObjectResult(Some(loadedValue), loadedRevision) =>
            val count = loadEnvelopeCounter.incrementAndGet()
            if (count % 1000 == 0)
              log.info(
                "Loaded durable state lazily, persistenceId [{}], revision [{}]. Load count [{}]",
                pid,
                loadedRevision: java.lang.Long,
                count: java.lang.Long)
            else
              log.debug(
                "Loaded durable state lazily, persistenceId [{}], revision [{}]. Load count [{}]",
                pid,
                loadedRevision: java.lang.Long,
                count: java.lang.Long)
            new UpdatedDurableState(pid, loadedRevision, loadedValue, upd.offset, upd.timestamp)
              .asInstanceOf[Envelope]
          case GetObjectResult(None, _) =>
            // FIXME use DeletedDurableState here when that is added
            throw new IllegalStateException(
              s"Durable state not found when loaded lazily, persistenceId [$pid], revision [$revision]")
        }

      case _ =>
        Future.successful(env)
    }
  }