in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala [206:222]
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
currentEventsByTag(tag, offset.value)
private def currentJournalEventsByTag(
tag: String,
offset: Long,
max: Long,
latestOrdering: MaxOrderingId): Source[EventEnvelope, NotUsed] = {
if (latestOrdering.maxOrdering < offset) Source.empty
else {
readJournalDao.eventsByTag(tag, offset, latestOrdering.maxOrdering, max).mapAsync(1)(Future.fromTry).mapConcat {
case (repr, _, ordering) =>
adaptEvents(repr).map(r =>
EventEnvelope(Sequence(ordering), r.persistenceId, r.sequenceNr, r.payload, r.timestamp, r.metadata))
}
}
}