in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala [105:123]
def findTimeForLimit(from: Instant, atLeastCounts: Int): Option[Instant] = {
val fromEpochSeconds = from.toEpochMilli / 1000
val iter = countByBucket.iterator.dropWhile { case (key, _) => fromEpochSeconds >= key }
@tailrec def sumUntilFilled(key: EpochSeconds, sum: Count): (EpochSeconds, Count) = {
if (iter.isEmpty || sum >= atLeastCounts)
key -> sum
else {
val (nextKey, count) = iter.next()
sumUntilFilled(nextKey, sum + count)
}
}
val (key, sum) = sumUntilFilled(fromEpochSeconds, 0)
if (sum >= atLeastCounts)
Some(Instant.ofEpochSecond(key + BucketDurationSeconds))
else
None
}