in core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateSequenceActor.scala [123:168]
final def receive(
currentMaxGlobalOffset: GlobalOffset,
missingByCounter: Map[Int, MissingElements[GlobalOffset]],
moduloCounter: Int,
previousDelay: FiniteDuration = queryDelay): Receive = {
case ScheduleAssumeMaxGlobalOffset(max) =>
// All elements smaller than max can be assumed missing after this delay
val delay = queryDelay * maxTries
timers.startSingleTimer(key = AssumeMaxGlobalOffsetTimerKey, AssumeMaxGlobalOffset(max), delay)
case AssumeMaxGlobalOffset(max) =>
if (currentMaxGlobalOffset < max) {
context.become(receive(max, missingByCounter, moduloCounter, previousDelay))
}
case GetMaxGlobalOffset =>
sender() ! MaxGlobalOffset(currentMaxGlobalOffset)
case QueryState =>
stateStore
.stateStoreStateInfo(currentMaxGlobalOffset, batchSize)
.runWith(Sink.seq)
.map(result =>
NewStateInfo(
currentMaxGlobalOffset,
result.map { case (pid, offset, rev) =>
VisitedElement(pid, offset, rev)
}.toList))
.pipeTo(self)
case NewStateInfo(originalOffset, _) if originalOffset < currentMaxGlobalOffset =>
// search was done using an offset that became obsolete in the meantime
// therefore we start a new query
self ! QueryState
case NewStateInfo(_, elements) =>
findGaps(elements, currentMaxGlobalOffset, missingByCounter, moduloCounter)
case Status.Failure(t) =>
val newDelay = maxBackoffQueryDelay.min(previousDelay * 2)
if (newDelay == maxBackoffQueryDelay) {
log.warning("Failed to query max global offset because of {}, retrying in [{}]", t, newDelay.toCoarsest)
}
scheduleQuery(newDelay)
context.become(receive(currentMaxGlobalOffset, missingByCounter, moduloCounter, newDelay))
}