def findGaps()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/JournalSequenceActor.scala [125:176]


  def findGaps(
      elements: Seq[OrderingId],
      currentMaxOrdering: OrderingId,
      missingByCounter: Map[Int, MissingElements[OrderingId]],
      moduloCounter: Int): Unit = {
    // list of elements that will be considered as genuine gaps.
    // `givenUp` is either empty or is was filled on a previous iteration
    val givenUp = missingByCounter.getOrElse(moduloCounter, MissingElements.empty[OrderingId])

    val (nextMax, _, missingElems) =
      // using the ordering elements that were fetched, we verify if there are any gaps
      elements.foldLeft[(OrderingId, OrderingId, MissingElements[OrderingId])](
        (currentMaxOrdering, currentMaxOrdering, MissingElements.empty[OrderingId])) {
        case ((currentMax, previousElement, missing), currentElement) =>
          // we must decide if we move the cursor forward
          val newMax =
            if ((currentMax + 1).until(currentElement).forall(givenUp.contains)) {
              // we move the cursor forward when:
              // 1) they have been detected as missing on previous iteration, it's time now to give up
              // 2) current + 1 == currentElement (meaning no gap). Note that `forall` on an empty range always returns true
              currentElement
            } else currentMax

          // we accumulate in newMissing the gaps we detect on each iteration
          val newMissing =
            if (previousElement + 1 == currentElement || newMax == currentElement) missing
            else missing.addRange(previousElement + 1, currentElement)

          (newMax, currentElement, newMissing)
      }

    val newMissingByCounter = missingByCounter + (moduloCounter -> missingElems)

    // did we detect gaps in the current batch?
    val noGapsFound = missingElems.isEmpty

    // full batch means that we retrieved as much elements as the batchSize
    // that happens when we are not yet at the end of the stream
    val isFullBatch = elements.size == batchSize

    if (noGapsFound && isFullBatch) {
      // Many elements have been retrieved but none are missing
      // We can query again immediately, as this allows the actor to rapidly retrieve the real max ordering
      self ! QueryOrderingIds
      context.become(receive(nextMax, newMissingByCounter, moduloCounter))
    } else {
      // either we detected gaps or we reached the end of stream (batch not full)
      // in this case we want to keep querying but not immediately
      scheduleQuery(queryDelay)
      context.become(receive(nextMax, newMissingByCounter, (moduloCounter + 1) % maxTries))
    }
  }