in core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala [48:101]
private[dao] def internalBatchStream(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
batchSize: Int,
refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
Source
.unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceNr, Continue)) {
case (from, control) =>
def limitWindow(from: Long): Long = {
if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue - batchSize) < from) {
toSequenceNr
} else {
Math.min(from + batchSize, toSequenceNr)
}
}
def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = {
for {
xs <- messages(persistenceId, from, limitWindow(from), batchSize).runWith(Sink.seq)
} yield {
val hasMoreEvents = xs.size == batchSize
// Events are ordered by sequence number, therefore the last one is the largest)
val lastSeqNrInBatch: Option[Long] = xs.lastOption match {
case Some(Success((repr, _))) => Some(repr.sequenceNr)
case Some(Failure(e)) => throw e // fail the returned Future
case None => None
}
val hasLastEvent = lastSeqNrInBatch.exists(_ >= toSequenceNr)
val nextControl: FlowControl =
if (hasLastEvent || from > toSequenceNr) Stop
else if (hasMoreEvents) Continue
else if (refreshInterval.isEmpty) Stop
else ContinueDelayed
val nextFrom: Long = lastSeqNrInBatch match {
// Continue querying from the last sequence number (the events are ordered)
case Some(lastSeqNr) => lastSeqNr + 1
case None => from
}
Some(((nextFrom, nextControl), xs))
}
}
control match {
case Stop => Future.successful(None)
case Continue => retrieveNextBatch()
case ContinueDelayed =>
val (delay, scheduler) = refreshInterval.get
pekko.pattern.after(delay, scheduler)(retrieveNextBatch())
}
}
}