in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala [95:158]
def loadEnvelope[Envelope](env: Envelope, sourceProvider: SourceProvider[_, Envelope])(implicit
ec: ExecutionContext): Future[Envelope] = {
env match {
case eventEnvelope: EventEnvelope[_] if eventEnvelope.eventOption.isEmpty && !skipEnvelope(eventEnvelope) =>
val pid = eventEnvelope.persistenceId
val seqNr = eventEnvelope.sequenceNr
(sourceProvider match {
case loadEventQuery: LoadEventQuery =>
loadEventQuery.loadEnvelope[Any](pid, seqNr)
case loadEventQuery: pekko.persistence.query.typed.javadsl.LoadEventQuery =>
import pekko.util.FutureConverters._
loadEventQuery.loadEnvelope[Any](pid, seqNr).asScala
case _ =>
throw new IllegalArgumentException(
s"Expected sourceProvider [${sourceProvider.getClass.getName}] " +
"to implement LoadEventQuery when used with eventsBySlices.")
}).map { loadedEnv =>
val count = loadEnvelopeCounter.incrementAndGet()
if (count % 1000 == 0)
log.info("Loaded event lazily, persistenceId [{}], seqNr [{}]. Load count [{}]", pid, seqNr: java.lang.Long,
count: java.lang.Long)
else
log.debug("Loaded event lazily, persistenceId [{}], seqNr [{}]. Load count [{}]", pid,
seqNr: java.lang.Long,
count: java.lang.Long)
loadedEnv.asInstanceOf[Envelope]
}
case upd: UpdatedDurableState[_] if upd.value == null =>
val pid = upd.persistenceId
val revision = upd.revision
(sourceProvider match {
case store: DurableStateStore[_] =>
store.getObject(pid)
case store: pekko.persistence.state.javadsl.DurableStateStore[_] =>
import pekko.util.FutureConverters._
store.getObject(pid).asScala.map(_.toScala)
}).map {
case GetObjectResult(Some(loadedValue), loadedRevision) =>
val count = loadEnvelopeCounter.incrementAndGet()
if (count % 1000 == 0)
log.info(
"Loaded durable state lazily, persistenceId [{}], revision [{}]. Load count [{}]",
pid,
loadedRevision: java.lang.Long,
count: java.lang.Long)
else
log.debug(
"Loaded durable state lazily, persistenceId [{}], revision [{}]. Load count [{}]",
pid,
loadedRevision: java.lang.Long,
count: java.lang.Long)
new UpdatedDurableState(pid, loadedRevision, loadedValue, upd.offset, upd.timestamp)
.asInstanceOf[Envelope]
case GetObjectResult(None, _) =>
// FIXME use DeletedDurableState here when that is added
throw new IllegalStateException(
s"Durable state not found when loaded lazily, persistenceId [$pid], revision [$revision]")
}
case _ =>
Future.successful(env)
}
}