in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/JournalSequenceActor.scala [80:120]
def receive(
currentMaxOrdering: OrderingId,
missingByCounter: Map[Int, MissingElements[OrderingId]],
moduloCounter: Int,
previousDelay: FiniteDuration = queryDelay): Receive = {
case ScheduleAssumeMaxOrderingId(max) =>
// All elements smaller than max can be assumed missing after this delay
val delay = queryDelay * maxTries
timers.startSingleTimer(key = AssumeMaxOrderingIdTimerKey, AssumeMaxOrderingId(max), delay)
case AssumeMaxOrderingId(max) =>
if (currentMaxOrdering < max) {
context.become(receive(max, missingByCounter, moduloCounter, previousDelay))
}
case GetMaxOrderingId =>
sender() ! MaxOrderingId(currentMaxOrdering)
case QueryOrderingIds =>
readJournalDao
.journalSequence(currentMaxOrdering, batchSize)
.runWith(Sink.seq)
.map(result => NewOrderingIds(currentMaxOrdering, result))
.pipeTo(self)
case NewOrderingIds(originalOffset, _) if originalOffset < currentMaxOrdering =>
// search was done using an offset that became obsolete in the meantime
// therefore we start a new query
self ! QueryOrderingIds
case NewOrderingIds(_, elements) =>
findGaps(elements, currentMaxOrdering, missingByCounter, moduloCounter)
case Status.Failure(t) =>
val newDelay = maxBackoffQueryDelay.min(previousDelay * 2)
if (newDelay == maxBackoffQueryDelay) {
log.warning("Failed to query max ordering id because of {}, retrying in {}", t, newDelay)
}
scheduleQuery(newDelay)
context.become(receive(currentMaxOrdering, missingByCounter, moduloCounter, newDelay))
}