@InternalApi private[pekko] def eventsByPersistenceId[T]()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala [629:665]


  @InternalApi private[pekko] def eventsByPersistenceId[T](
      persistenceId: String,
      fromSequenceNr: Long,
      toSequenceNr: Long,
      max: Long,
      refreshInterval: Option[FiniteDuration],
      readProfile: String,
      name: String,
      extractor: Extractor[T],
      fastForwardEnabled: Boolean = false): Source[T, Future[EventsByPersistenceIdStage.Control]] = {

    val deserializeEventAsync = querySettings.deserializationParallelism > 1

    createFutureSource(combinedEventsByPersistenceIdStmts) { (s, c) =>
      log.debug("Creating EventByPersistentIdState graph")
      Source
        .fromGraph(
          new EventsByPersistenceIdStage(
            persistenceId,
            fromSequenceNr,
            toSequenceNr,
            max,
            refreshInterval,
            EventsByPersistenceIdStage.EventsByPersistenceIdSession(
              c.preparedSelectEventsByPersistenceId,
              c.prepareSelectHighestNr,
              c.preparedSelectDeletedTo,
              s,
              querySettings.readProfile),
            settings,
            fastForwardEnabled))
        .named(name)
    }.mapAsync(querySettings.deserializationParallelism) { row =>
      extractor.extract(row, deserializeEventAsync)
    }
      .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
  }