in eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala [84:137]
def sliceForPersistenceId(system: ActorSystem[_], readJournalPluginId: String, persistenceId: String): Int =
PersistenceQuery(system)
.readJournalFor[EventsBySliceQuery](readJournalPluginId)
.sliceForPersistenceId(persistenceId)
def sliceRanges(system: ActorSystem[_], readJournalPluginId: String, numberOfRanges: Int): immutable.Seq[Range] =
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId).sliceRanges(numberOfRanges)
private class EventsBySlicesSourceProvider[Event](
eventsBySlicesQuery: EventsBySliceQuery,
entityType: String,
override val minSlice: Int,
override val maxSlice: Int,
system: ActorSystem[_])
extends SourceProvider[Offset, pekko.persistence.query.typed.EventEnvelope[Event]]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
implicit val executionContext: ExecutionContext = system.executionContext
override def source(offset: () => Future[Option[Offset]])
: Future[Source[pekko.persistence.query.typed.EventEnvelope[Event], NotUsed]] =
offset().map { offsetOpt =>
val offset = offsetOpt.getOrElse(NoOffset)
eventsBySlicesQuery.eventsBySlices(entityType, minSlice, maxSlice, offset)
}
override def extractOffset(envelope: pekko.persistence.query.typed.EventEnvelope[Event]): Offset = envelope.offset
override def extractCreationTime(envelope: pekko.persistence.query.typed.EventEnvelope[Event]): Long =
envelope.timestamp
override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] =
eventsBySlicesQuery match {
case timestampQuery: EventTimestampQuery =>
timestampQuery.timestampOf(persistenceId, sequenceNr)
case _ =>
Future.failed(
new IllegalStateException(
s"[${eventsBySlicesQuery.getClass.getName}] must implement [${classOf[EventTimestampQuery].getName}]"))
}
override def loadEnvelope[Evt](
persistenceId: String,
sequenceNr: Long): Future[pekko.persistence.query.typed.EventEnvelope[Evt]] =
eventsBySlicesQuery match {
case laodEventQuery: LoadEventQuery =>
laodEventQuery.loadEnvelope(persistenceId, sequenceNr)
case _ =>
Future.failed(
new IllegalStateException(
s"[${eventsBySlicesQuery.getClass.getName}] must implement [${classOf[LoadEventQuery].getName}]"))
}
}