in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala [233:283]
private def eventsByTag(
tag: String,
offset: Long,
terminateAfterOffset: Option[Long]): Source[EventEnvelope, NotUsed] = {
import pekko.pattern.ask
import FlowControl._
implicit val askTimeout: Timeout = Timeout(readJournalConfig.journalSequenceRetrievalConfiguration.askTimeout)
val batchSize = readJournalConfig.maxBufferSize
Source
.unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset, Continue)) { case (from, control) =>
def retrieveNextBatch() = {
for {
queryUntil <- journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId]
xs <- currentJournalEventsByTag(tag, from, batchSize, queryUntil).runWith(Sink.seq)
} yield {
val hasMoreEvents = xs.size == batchSize
val nextControl: FlowControl =
terminateAfterOffset match {
// we may stop if target is behind queryUntil and we don't have more events to fetch
case Some(target) if !hasMoreEvents && target <= queryUntil.maxOrdering => Stop
// We may also stop if we have found an event with an offset >= target
case Some(target) if xs.exists(_.offset.value >= target) => Stop
// otherwise, disregarding if Some or None, we must decide how to continue
case _ =>
if (hasMoreEvents) Continue else ContinueDelayed
}
val nextStartingOffset = if (xs.isEmpty) {
/* If no events matched the tag between `from` and `maxOrdering` then there is no need to execute the exact
* same query again. We can continue querying from `maxOrdering`, which will save some load on the db.
* (Note: we may never return a value smaller than `from`, otherwise we might return duplicate events) */
math.max(from, queryUntil.maxOrdering)
} else {
// Continue querying from the largest offset
xs.map(_.offset.value).max
}
Some(((nextStartingOffset, nextControl), xs))
}
}
control match {
case Stop => Future.successful(None)
case Continue => retrieveNextBatch()
case ContinueDelayed =>
pekko.pattern.after(readJournalConfig.refreshInterval, system.scheduler)(retrieveNextBatch())
}
}
.mapConcat(identity)
}