in core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala [173:277]
final def findGaps(
elements: List[VisitedElement],
currentMaxOffset: GlobalOffset,
missingByCounter: Map[Int, MissingElements[GlobalOffset]],
moduloCounter: Int): Unit = {
// list of elements that will be considered as genuine gaps.
// `givenUp` is either empty or is was filled on a previous iteration
val givenUp = missingByCounter.getOrElse(moduloCounter, MissingElements.empty[GlobalOffset])
val (nextMax, _, missingElems) =
// using the global offset of the elements that were fetched, we verify if there are any gaps
elements.foldLeft[(GlobalOffset, GlobalOffset, MissingElements[GlobalOffset])](
(currentMaxOffset, currentMaxOffset, MissingElements.empty[GlobalOffset])) {
case ((currentMax, previousOffset, missing), currentElement) =>
// we must decide if we move the cursor forward
val newMax =
if ((currentMax + 1).until(currentElement.offset).forall(givenUp.contains)) {
// we move the cursor forward when:
// 1) they have been detected as missing on previous iteration, it's time now to give up
// 2) current + 1 == currentElement (meaning no gap). Note that `forall` on an empty range always returns true
currentElement.offset
} else currentMax
// we accumulate in newMissing the gaps we detect on each iteration
val newMissing =
if (previousOffset + 1 == currentElement.offset || newMax == currentElement.offset) missing
else missing.addRange(previousOffset + 1, currentElement.offset)
(newMax, currentElement.offset, newMissing)
}
// these offsets will be used as givenUp after one round when back to the same moduloCounter
val newMissingByCounter = missingByCounter + (moduloCounter -> missingElems)
// did we detect gaps in the current batch?
val noGapsFound = missingElems.isEmpty
// full batch means that we retrieved as much elements as the batchSize
// that happens when we are not yet at the end of the stream
val isFullBatch = elements.size == batchSize
if (noGapsFound) {
addToRevisionCache(elements, nextMax)
if (isFullBatch) {
// We can query again immediately, as this allows the actor to rapidly retrieve the real max offset.
// Using same moduloCounter.
self ! QueryState
context.become(receive(nextMax, newMissingByCounter, moduloCounter))
} else {
// keep querying but not immediately
scheduleQuery(queryDelay)
context.become(receive(nextMax, newMissingByCounter, (moduloCounter + 1) % maxTries))
}
} else {
// We detected gaps. When there are updates to the same persistence id we might not see all subsequent
// changes but only the latest. Those changes will be seen as gaps. By looking at the difference in revisions
// for persistence ids that we have seen before (included in the revisionCache) we try to confirm if
// the offset gaps can be filled by the revision changes.
val missingOffsetCount = missingElems.size
val (inBetweenRevisionChanges, newMaxOffset, cacheMissed) =
// in this fold we find the possibly new max offset and the total revision difference for all persistence ids
elements.foldLeft((0L, nextMax, false)) { case ((revChg, currMaxOffset, cacheMiss), elem) =>
revisionCache.get(elem.pid) match {
case Some(e) =>
// cache hit: find the revision difference
val maxOffset = math.max(currMaxOffset, elem.offset)
val revDiff = elem.revision - e.revision
if (revDiff <= 1) {
(revChg, maxOffset, cacheMiss)
} else {
val pidOffsets =
(e.offset until elem.offset).tail // e.offset and elem.offset are known to not be missing
val missingCount = math.min(pidOffsets.count(missingElems.contains), revDiff - 1)
(revChg + missingCount, maxOffset, cacheMiss)
}
case None =>
// this persistence id was not present in the cache
(revChg, math.max(elem.offset, currMaxOffset), cacheMiss || elem.revision != 1L)
}
}
// in this case we want to keep querying but not immediately
scheduleQuery(queryDelay)
if (cacheMissed || missingOffsetCount != inBetweenRevisionChanges) {
// gaps could not be confirmed
if (log.isDebugEnabled) {
log.debug(
"Offset gaps detected [{}]. Current max offset [{}]. [{}] gaps could not be confirmed by revision changes.{}",
missingElems,
nextMax,
missingOffsetCount - inBetweenRevisionChanges,
if (cacheMissed) " Some new persistence ids without previously known revision." else "")
}
addToRevisionCache(elements, nextMax)
context.become(receive(nextMax, newMissingByCounter, (moduloCounter + 1) % maxTries))
} else {
addToRevisionCache(elements, newMaxOffset)
context.become(receive(newMaxOffset, newMissingByCounter, (moduloCounter + 1) % maxTries))
}
}
}