in durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala [91:138]
def sliceForPersistenceId(
system: ActorSystem[_],
durableStateStoreQueryPluginId: String,
persistenceId: String): Int =
DurableStateStoreRegistry(system)
.durableStateStoreFor[DurableStateStoreBySliceQuery[Any]](durableStateStoreQueryPluginId)
.sliceForPersistenceId(persistenceId)
def sliceRanges(
system: ActorSystem[_],
durableStateStoreQueryPluginId: String,
numberOfRanges: Int): immutable.Seq[Range] =
DurableStateStoreRegistry(system)
.durableStateStoreFor[DurableStateStoreBySliceQuery[Any]](durableStateStoreQueryPluginId)
.sliceRanges(numberOfRanges)
private class DurableStateBySlicesSourceProvider[A](
durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
entityType: String,
override val minSlice: Int,
override val maxSlice: Int,
system: ActorSystem[_])
extends SourceProvider[Offset, DurableStateChange[A]]
with BySlicesSourceProvider
with DurableStateStore[A] {
implicit val executionContext: ExecutionContext = system.executionContext
override def source(offset: () => Future[Option[Offset]]): Future[Source[DurableStateChange[A], NotUsed]] =
offset().map { offsetOpt =>
val offset = offsetOpt.getOrElse(NoOffset)
durableStateStoreQuery
.changesBySlices(entityType, minSlice, maxSlice, offset)
}
override def extractOffset(stateChange: DurableStateChange[A]): Offset = stateChange.offset
override def extractCreationTime(stateChange: DurableStateChange[A]): Long =
stateChange match {
case u: UpdatedDurableState[_] => u.timestamp
case other =>
// FIXME case DeletedDurableState when that is added
throw new IllegalArgumentException(
s"DurableStateChange [${other.getClass.getName}] not implemented yet. Please report bug at https://github.com/apache/incubator-pekko-persistence-r2dbc/issues")
}
override def getObject(persistenceId: String): Future[GetObjectResult[A]] =
durableStateStoreQuery.getObject(persistenceId)
}