def receive()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/JournalSequenceActor.scala [80:120]


  def receive(
      currentMaxOrdering: OrderingId,
      missingByCounter: Map[Int, MissingElements[OrderingId]],
      moduloCounter: Int,
      previousDelay: FiniteDuration = queryDelay): Receive = {
    case ScheduleAssumeMaxOrderingId(max) =>
      // All elements smaller than max can be assumed missing after this delay
      val delay = queryDelay * maxTries
      timers.startSingleTimer(key = AssumeMaxOrderingIdTimerKey, AssumeMaxOrderingId(max), delay)

    case AssumeMaxOrderingId(max) =>
      if (currentMaxOrdering < max) {
        context.become(receive(max, missingByCounter, moduloCounter, previousDelay))
      }

    case GetMaxOrderingId =>
      sender() ! MaxOrderingId(currentMaxOrdering)

    case QueryOrderingIds =>
      readJournalDao
        .journalSequence(currentMaxOrdering, batchSize)
        .runWith(Sink.seq)
        .map(result => NewOrderingIds(currentMaxOrdering, result))
        .pipeTo(self)

    case NewOrderingIds(originalOffset, _) if originalOffset < currentMaxOrdering =>
      // search was done using an offset that became obsolete in the meantime
      // therefore we start a new query
      self ! QueryOrderingIds

    case NewOrderingIds(_, elements) =>
      findGaps(elements, currentMaxOrdering, missingByCounter, moduloCounter)

    case Status.Failure(t) =>
      val newDelay = maxBackoffQueryDelay.min(previousDelay * 2)
      if (newDelay == maxBackoffQueryDelay) {
        log.warning("Failed to query max ordering id because of {}, retrying in {}", t, newDelay)
      }
      scheduleQuery(newDelay)
      context.become(receive(currentMaxOrdering, missingByCounter, moduloCounter, newDelay))
  }