in core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala [150:200]
private[jdbc] def changesByTag(
tag: String,
offset: Long,
terminateAfterOffset: Option[Long]): Source[DurableStateChange[A], NotUsed] = {
val batchSize = durableStateConfig.batchSize
val startingOffsets = List.empty[Long]
implicit val askTimeout: Timeout = Timeout(durableStateConfig.stateSequenceConfig.askTimeout)
Source
.unfoldAsync[(Long, FlowControl, List[Long]), Seq[DurableStateChange[A]]]((offset, Continue, startingOffsets)) {
case (from, control, s) =>
def retrieveNextBatch() = {
for {
queryUntil <- stateSequenceActor.ask(GetMaxGlobalOffset).mapTo[MaxGlobalOffset]
xs <- currentChangesByTag(tag, from, batchSize, queryUntil).runWith(Sink.seq)
} yield {
val hasMoreEvents = xs.size == batchSize
val nextControl: FlowControl =
terminateAfterOffset match {
// we may stop if target is behind queryUntil and we don't have more events to fetch
case Some(target) if !hasMoreEvents && target <= queryUntil.maxOffset => Stop
// We may also stop if we have found an event with an offset >= target
case Some(target) if xs.exists(_.offset.value >= target) => Stop
// otherwise, disregarding if Some or None, we must decide how to continue
case _ =>
if (hasMoreEvents) Continue
else ContinueDelayed
}
val nextStartingOffset = if (xs.isEmpty) {
math.max(from.value, queryUntil.maxOffset)
} else {
// Continue querying from the largest offset
xs.map(_.offset.value).max
}
Some(((nextStartingOffset, nextControl, s :+ nextStartingOffset), xs))
}
}
control match {
case Stop => Future.successful(None)
case Continue => retrieveNextBatch()
case ContinueDelayed =>
pekko.pattern.after(durableStateConfig.refreshInterval, system.scheduler)(retrieveNextBatch())
}
}
.mapConcat(identity)
}