final def findGaps()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala [173:277]


  final def findGaps(
      elements: List[VisitedElement],
      currentMaxOffset: GlobalOffset,
      missingByCounter: Map[Int, MissingElements[GlobalOffset]],
      moduloCounter: Int): Unit = {
    // list of elements that will be considered as genuine gaps.
    // `givenUp` is either empty or is was filled on a previous iteration
    val givenUp = missingByCounter.getOrElse(moduloCounter, MissingElements.empty[GlobalOffset])

    val (nextMax, _, missingElems) =
      // using the global offset of the elements that were fetched, we verify if there are any gaps
      elements.foldLeft[(GlobalOffset, GlobalOffset, MissingElements[GlobalOffset])](
        (currentMaxOffset, currentMaxOffset, MissingElements.empty[GlobalOffset])) {
        case ((currentMax, previousOffset, missing), currentElement) =>
          // we must decide if we move the cursor forward
          val newMax =
            if ((currentMax + 1).until(currentElement.offset).forall(givenUp.contains)) {
              // we move the cursor forward when:
              // 1) they have been detected as missing on previous iteration, it's time now to give up
              // 2) current + 1 == currentElement (meaning no gap). Note that `forall` on an empty range always returns true
              currentElement.offset
            } else currentMax

          // we accumulate in newMissing the gaps we detect on each iteration
          val newMissing =
            if (previousOffset + 1 == currentElement.offset || newMax == currentElement.offset) missing
            else missing.addRange(previousOffset + 1, currentElement.offset)

          (newMax, currentElement.offset, newMissing)
      }

    // these offsets will be used as givenUp after one round when back to the same  moduloCounter
    val newMissingByCounter = missingByCounter + (moduloCounter -> missingElems)

    // did we detect gaps in the current batch?
    val noGapsFound = missingElems.isEmpty

    // full batch means that we retrieved as much elements as the batchSize
    // that happens when we are not yet at the end of the stream
    val isFullBatch = elements.size == batchSize

    if (noGapsFound) {
      addToRevisionCache(elements, nextMax)
      if (isFullBatch) {
        // We can query again immediately, as this allows the actor to rapidly retrieve the real max offset.
        // Using same moduloCounter.
        self ! QueryState
        context.become(receive(nextMax, newMissingByCounter, moduloCounter))
      } else {
        // keep querying but not immediately
        scheduleQuery(queryDelay)
        context.become(receive(nextMax, newMissingByCounter, (moduloCounter + 1) % maxTries))
      }
    } else {
      // We detected gaps. When there are updates to the same persistence id we might not see all subsequent
      // changes but only the latest. Those changes will be seen as gaps. By looking at the difference in revisions
      // for persistence ids that we have seen before (included in the revisionCache) we try to confirm if
      // the offset gaps can be filled by the revision changes.
      val missingOffsetCount = missingElems.size

      val (inBetweenRevisionChanges, newMaxOffset, cacheMissed) =
        // in this fold we find the possibly new max offset and the total revision difference for all persistence ids
        elements.foldLeft((0L, nextMax, false)) { case ((revChg, currMaxOffset, cacheMiss), elem) =>
          revisionCache.get(elem.pid) match {
            case Some(e) =>
              // cache hit: find the revision difference
              val maxOffset = math.max(currMaxOffset, elem.offset)
              val revDiff = elem.revision - e.revision
              if (revDiff <= 1) {
                (revChg, maxOffset, cacheMiss)
              } else {
                val pidOffsets =
                  (e.offset until elem.offset).tail // e.offset and elem.offset are known to not be missing
                val missingCount = math.min(pidOffsets.count(missingElems.contains), revDiff - 1)
                (revChg + missingCount, maxOffset, cacheMiss)
              }
            case None =>
              // this persistence id was not present in the cache
              (revChg, math.max(elem.offset, currMaxOffset), cacheMiss || elem.revision != 1L)
          }
        }

      // in this case we want to keep querying but not immediately
      scheduleQuery(queryDelay)

      if (cacheMissed || missingOffsetCount != inBetweenRevisionChanges) {
        // gaps could not be confirmed

        if (log.isDebugEnabled) {
          log.debug(
            "Offset gaps detected [{}]. Current max offset [{}]. [{}] gaps could not be confirmed by revision changes.{}",
            missingElems,
            nextMax,
            missingOffsetCount - inBetweenRevisionChanges,
            if (cacheMissed) " Some new persistence ids without previously known revision." else "")
        }

        addToRevisionCache(elements, nextMax)
        context.become(receive(nextMax, newMissingByCounter, (moduloCounter + 1) % maxTries))
      } else {
        addToRevisionCache(elements, newMaxOffset)
        context.become(receive(newMaxOffset, newMissingByCounter, (moduloCounter + 1) % maxTries))
      }
    }
  }