override def currentEventsByPersistenceId()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala [140:199]


  override def currentEventsByPersistenceId(
      persistenceId: String,
      fromSequenceNr: Long,
      toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
    eventsByPersistenceIdSource(persistenceId, fromSequenceNr, toSequenceNr, None)

  /**
   * `eventsByPersistenceId` is used to retrieve a stream of events for a particular persistenceId.
   *
   * The `EventEnvelope` contains the event and provides `persistenceId` and `sequenceNr`
   * for each event. The `sequenceNr` is the sequence number for the persistent actor with the
   * `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
   * identifier for the event.
   *
   * `fromSequenceNr` and `toSequenceNr` can be specified to limit the set of returned events.
   * The `fromSequenceNr` and `toSequenceNr` are inclusive.
   *
   * The `EventEnvelope` also provides the `offset` that corresponds to the `ordering` column in
   * the Journal table. The `ordering` is a sequential id number that uniquely identifies the
   * position of each event, also across different `persistenceId`. The `Offset` type is
   * `org.apache.pekko.persistence.query.Sequence` with the `ordering` as the offset value. This is the
   * same `ordering` number as is used in the offset of the `eventsByTag` query.
   *
   * The returned event stream is ordered by `sequenceNr`.
   *
   * Causality is guaranteed (`sequenceNr`s of events for a particular `persistenceId` are always ordered
   * in a sequence monotonically increasing by one). Multiple executions of the same bounded stream are
   * guaranteed to emit exactly the same stream of events.
   *
   * The stream is not completed when it reaches the end of the currently stored events,
   * but it continues to push new events when new events are persisted.
   * Corresponding query that is completed when it reaches the end of the currently
   * stored events is provided by `currentEventsByPersistenceId`.
   */
  override def eventsByPersistenceId(
      persistenceId: String,
      fromSequenceNr: Long,
      toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
    eventsByPersistenceIdSource(
      persistenceId,
      fromSequenceNr,
      toSequenceNr,
      Some(readJournalConfig.refreshInterval -> system.scheduler))

  private def eventsByPersistenceIdSource(
      persistenceId: String,
      fromSequenceNr: Long,
      toSequenceNr: Long,
      refreshInterval: Option[(FiniteDuration, Scheduler)]): Source[EventEnvelope, NotUsed] = {
    val batchSize = readJournalConfig.maxBufferSize
    readJournalDao
      .messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, batchSize, refreshInterval)
      .mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr))
      .mapConcat { case (repr, ordNr) =>
        adaptEvents(repr).map(_ -> ordNr)
      }
      .map { case (repr, ordNr) =>
        EventEnvelope(Sequence(ordNr), repr.persistenceId, repr.sequenceNr, repr.payload, repr.timestamp, repr.metadata)
      }
  }