in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala [629:665]
@InternalApi private[pekko] def eventsByPersistenceId[T](
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long,
refreshInterval: Option[FiniteDuration],
readProfile: String,
name: String,
extractor: Extractor[T],
fastForwardEnabled: Boolean = false): Source[T, Future[EventsByPersistenceIdStage.Control]] = {
val deserializeEventAsync = querySettings.deserializationParallelism > 1
createFutureSource(combinedEventsByPersistenceIdStmts) { (s, c) =>
log.debug("Creating EventByPersistentIdState graph")
Source
.fromGraph(
new EventsByPersistenceIdStage(
persistenceId,
fromSequenceNr,
toSequenceNr,
max,
refreshInterval,
EventsByPersistenceIdStage.EventsByPersistenceIdSession(
c.preparedSelectEventsByPersistenceId,
c.prepareSelectHighestNr,
c.preparedSelectDeletedTo,
s,
querySettings.readProfile),
settings,
fastForwardEnabled))
.named(name)
}.mapAsync(querySettings.deserializationParallelism) { row =>
extractor.extract(row, deserializeEventAsync)
}
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
}