in core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala [144:154]
override def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] =
changesByTag(tag, offset.value, terminateAfterOffset = None)
private def currentChangesByTag(
tag: String,
from: Long,
batchSize: Long,
queryUntil: MaxGlobalOffset): Source[DurableStateChange[A], NotUsed] = {
if (queryUntil.maxOffset < from) Source.empty
else changesByTagFromDb(tag, from, queryUntil.maxOffset, batchSize).mapAsync(1)(Future.fromTry)
}