in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/JournalSequenceActor.scala [125:176]
def findGaps(
elements: Seq[OrderingId],
currentMaxOrdering: OrderingId,
missingByCounter: Map[Int, MissingElements[OrderingId]],
moduloCounter: Int): Unit = {
// list of elements that will be considered as genuine gaps.
// `givenUp` is either empty or is was filled on a previous iteration
val givenUp = missingByCounter.getOrElse(moduloCounter, MissingElements.empty[OrderingId])
val (nextMax, _, missingElems) =
// using the ordering elements that were fetched, we verify if there are any gaps
elements.foldLeft[(OrderingId, OrderingId, MissingElements[OrderingId])](
(currentMaxOrdering, currentMaxOrdering, MissingElements.empty[OrderingId])) {
case ((currentMax, previousElement, missing), currentElement) =>
// we must decide if we move the cursor forward
val newMax =
if ((currentMax + 1).until(currentElement).forall(givenUp.contains)) {
// we move the cursor forward when:
// 1) they have been detected as missing on previous iteration, it's time now to give up
// 2) current + 1 == currentElement (meaning no gap). Note that `forall` on an empty range always returns true
currentElement
} else currentMax
// we accumulate in newMissing the gaps we detect on each iteration
val newMissing =
if (previousElement + 1 == currentElement || newMax == currentElement) missing
else missing.addRange(previousElement + 1, currentElement)
(newMax, currentElement, newMissing)
}
val newMissingByCounter = missingByCounter + (moduloCounter -> missingElems)
// did we detect gaps in the current batch?
val noGapsFound = missingElems.isEmpty
// full batch means that we retrieved as much elements as the batchSize
// that happens when we are not yet at the end of the stream
val isFullBatch = elements.size == batchSize
if (noGapsFound && isFullBatch) {
// Many elements have been retrieved but none are missing
// We can query again immediately, as this allows the actor to rapidly retrieve the real max ordering
self ! QueryOrderingIds
context.become(receive(nextMax, newMissingByCounter, moduloCounter))
} else {
// either we detected gaps or we reached the end of stream (batch not full)
// in this case we want to keep querying but not immediately
scheduleQuery(queryDelay)
context.become(receive(nextMax, newMissingByCounter, (moduloCounter + 1) % maxTries))
}
}