in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala [143:202]
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
eventsByPersistenceIdSource(persistenceId, fromSequenceNr, toSequenceNr, None)
/**
* `eventsByPersistenceId` is used to retrieve a stream of events for a particular persistenceId.
*
* The `EventEnvelope` contains the event and provides `persistenceId` and `sequenceNr`
* for each event. The `sequenceNr` is the sequence number for the persistent actor with the
* `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
* identifier for the event.
*
* `fromSequenceNr` and `toSequenceNr` can be specified to limit the set of returned events.
* The `fromSequenceNr` and `toSequenceNr` are inclusive.
*
* The `EventEnvelope` also provides the `offset` that corresponds to the `ordering` column in
* the Journal table. The `ordering` is a sequential id number that uniquely identifies the
* position of each event, also across different `persistenceId`. The `Offset` type is
* `org.apache.pekko.persistence.query.Sequence` with the `ordering` as the offset value. This is the
* same `ordering` number as is used in the offset of the `eventsByTag` query.
*
* The returned event stream is ordered by `sequenceNr`.
*
* Causality is guaranteed (`sequenceNr`s of events for a particular `persistenceId` are always ordered
* in a sequence monotonically increasing by one). Multiple executions of the same bounded stream are
* guaranteed to emit exactly the same stream of events.
*
* The stream is not completed when it reaches the end of the currently stored events,
* but it continues to push new events when new events are persisted.
* Corresponding query that is completed when it reaches the end of the currently
* stored events is provided by `currentEventsByPersistenceId`.
*/
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
eventsByPersistenceIdSource(
persistenceId,
fromSequenceNr,
toSequenceNr,
Some(readJournalConfig.refreshInterval -> system.scheduler))
private def eventsByPersistenceIdSource(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
refreshInterval: Option[(FiniteDuration, Scheduler)]): Source[EventEnvelope, NotUsed] = {
val batchSize = readJournalConfig.maxBufferSize
readJournalDao
.messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, batchSize, refreshInterval)
.mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr))
.mapConcat { case (repr, ordNr) =>
adaptEvents(repr).map(_ -> ordNr)
}
.map { case (repr, ordNr) =>
EventEnvelope(Sequence(ordNr), repr.persistenceId, repr.sequenceNr, repr.payload, repr.timestamp, repr.metadata)
}
}