final def receive()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala [123:168]


  final def receive(
      currentMaxGlobalOffset: GlobalOffset,
      missingByCounter: Map[Int, MissingElements[GlobalOffset]],
      moduloCounter: Int,
      previousDelay: FiniteDuration = queryDelay): Receive = {
    case ScheduleAssumeMaxGlobalOffset(max) =>
      // All elements smaller than max can be assumed missing after this delay
      val delay = queryDelay * maxTries
      timers.startSingleTimer(key = AssumeMaxGlobalOffsetTimerKey, AssumeMaxGlobalOffset(max), delay)

    case AssumeMaxGlobalOffset(max) =>
      if (currentMaxGlobalOffset < max) {
        context.become(receive(max, missingByCounter, moduloCounter, previousDelay))
      }

    case GetMaxGlobalOffset =>
      sender() ! MaxGlobalOffset(currentMaxGlobalOffset)

    case QueryState =>
      stateStore
        .stateStoreStateInfo(currentMaxGlobalOffset, batchSize)
        .runWith(Sink.seq)
        .map(result =>
          NewStateInfo(
            currentMaxGlobalOffset,
            result.map { case (pid, offset, rev) =>
              VisitedElement(pid, offset, rev)
            }.toList))
        .pipeTo(self)

    case NewStateInfo(originalOffset, _) if originalOffset < currentMaxGlobalOffset =>
      // search was done using an offset that became obsolete in the meantime
      // therefore we start a new query
      self ! QueryState

    case NewStateInfo(_, elements) =>
      findGaps(elements, currentMaxGlobalOffset, missingByCounter, moduloCounter)

    case Status.Failure(t) =>
      val newDelay = maxBackoffQueryDelay.min(previousDelay * 2)
      if (newDelay == maxBackoffQueryDelay) {
        log.warning("Failed to query max global offset because of {}, retrying in [{}]", t, newDelay.toCoarsest)
      }
      scheduleQuery(newDelay)
      context.become(receive(currentMaxGlobalOffset, missingByCounter, moduloCounter, newDelay))
  }