in eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala [108:177]
def sliceForPersistenceId(system: ActorSystem[_], readJournalPluginId: String, persistenceId: String): Int =
PersistenceQuery(system)
.getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
.sliceForPersistenceId(persistenceId)
def sliceRanges(
system: ActorSystem[_],
readJournalPluginId: String,
numberOfRanges: Int): java.util.List[Pair[Integer, Integer]] =
PersistenceQuery(system)
.getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
.sliceRanges(numberOfRanges)
/**
* INTERNAL API
*/
@InternalApi
private class EventsBySlicesSourceProvider[Event](
eventsBySlicesQuery: EventsBySliceQuery,
entityType: String,
override val minSlice: Int,
override val maxSlice: Int,
system: ActorSystem[_])
extends SourceProvider[Offset, pekko.persistence.query.typed.EventEnvelope[Event]]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
implicit val executionContext: ExecutionContext = system.executionContext
override def source(offsetAsync: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[pekko.persistence.query.typed.EventEnvelope[Event], NotUsed]] = {
val source: Future[Source[pekko.persistence.query.typed.EventEnvelope[Event], NotUsed]] =
offsetAsync.get().asScala.map { offsetOpt =>
eventsBySlicesQuery
.eventsBySlices(entityType, minSlice, maxSlice, offsetOpt.orElse(NoOffset))
}
source.asJava
}
override def extractOffset(envelope: pekko.persistence.query.typed.EventEnvelope[Event]): Offset = envelope.offset
override def extractCreationTime(envelope: pekko.persistence.query.typed.EventEnvelope[Event]): Long =
envelope.timestamp
override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] =
eventsBySlicesQuery match {
case timestampQuery: EventTimestampQuery =>
timestampQuery.timestampOf(persistenceId, sequenceNr)
case _ =>
val failed = new CompletableFuture[Optional[Instant]]
failed.completeExceptionally(
new IllegalStateException(
s"[${eventsBySlicesQuery.getClass.getName}] must implement [${classOf[EventTimestampQuery].getName}]"))
failed.toCompletableFuture
}
override def loadEnvelope[Evt](
persistenceId: String,
sequenceNr: Long): CompletionStage[pekko.persistence.query.typed.EventEnvelope[Evt]] =
eventsBySlicesQuery match {
case loadEventQuery: LoadEventQuery =>
loadEventQuery.loadEnvelope(persistenceId, sequenceNr)
case _ =>
val failed = new CompletableFuture[pekko.persistence.query.typed.EventEnvelope[Evt]]
failed.completeExceptionally(
new IllegalStateException(
s"[${eventsBySlicesQuery.getClass.getName}] must implement [${classOf[LoadEventQuery].getName}]"))
failed.toCompletableFuture
}
}