def sliceForPersistenceId()

in durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala [100:149]


  def sliceForPersistenceId(
      system: ActorSystem[_],
      durableStateStoreQueryPluginId: String,
      persistenceId: String): Int =
    DurableStateStoreRegistry(system)
      .getDurableStateStoreFor(classOf[DurableStateStoreBySliceQuery[Any]], durableStateStoreQueryPluginId)
      .sliceForPersistenceId(persistenceId)

  def sliceRanges(
      system: ActorSystem[_],
      durableStateStoreQueryPluginId: String,
      numberOfRanges: Int): java.util.List[Pair[Integer, Integer]] =
    DurableStateStoreRegistry(system)
      .getDurableStateStoreFor(classOf[DurableStateStoreBySliceQuery[Any]], durableStateStoreQueryPluginId)
      .sliceRanges(numberOfRanges)

  private class DurableStateBySlicesSourceProvider[A](
      durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
      entityType: String,
      override val minSlice: Int,
      override val maxSlice: Int,
      system: ActorSystem[_])
      extends SourceProvider[Offset, DurableStateChange[A]]
      with BySlicesSourceProvider
      with DurableStateStore[A] {
    implicit val executionContext: ExecutionContext = system.executionContext

    override def source(offsetAsync: Supplier[CompletionStage[Optional[Offset]]])
        : CompletionStage[Source[DurableStateChange[A], NotUsed]] = {
      val source: Future[Source[DurableStateChange[A], NotUsed]] = offsetAsync.get().asScala.map { offsetOpt =>
        durableStateStoreQuery
          .changesBySlices(entityType, minSlice, maxSlice, offsetOpt.orElse(NoOffset))
      }
      source.asJava
    }

    override def extractOffset(stateChange: DurableStateChange[A]): Offset = stateChange.offset

    override def extractCreationTime(stateChange: DurableStateChange[A]): Long =
      stateChange match {
        case u: UpdatedDurableState[_] => u.timestamp
        case other                     =>
          // FIXME case DeletedDurableState when that is added
          throw new IllegalArgumentException(
            s"DurableStateChange [${other.getClass.getName}] not implemented yet. Please report bug at https://github.com/apache/pekko-persistence-r2dbc/issues")
      }

    override def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] =
      durableStateStoreQuery.getObject(persistenceId)
  }