override def currentEventsByTag()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala [206:222]


  override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
    currentEventsByTag(tag, offset.value)

  private def currentJournalEventsByTag(
      tag: String,
      offset: Long,
      max: Long,
      latestOrdering: MaxOrderingId): Source[EventEnvelope, NotUsed] = {
    if (latestOrdering.maxOrdering < offset) Source.empty
    else {
      readJournalDao.eventsByTag(tag, offset, latestOrdering.maxOrdering, max).mapAsync(1)(Future.fromTry).mapConcat {
        case (repr, _, ordering) =>
          adaptEvents(repr).map(r =>
            EventEnvelope(Sequence(ordering), r.persistenceId, r.sequenceNr, r.payload, r.timestamp, r.metadata))
      }
    }
  }