in core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala [430:470]
override def countBuckets(
entityType: String,
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]] = {
val toTimestamp = {
val now = Instant.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
// max buckets, just to have some upper bound
val t = fromTimestamp.plusSeconds(Buckets.BucketDurationSeconds * limit + Buckets.BucketDurationSeconds)
if (t.isAfter(now)) now else t
}
}
val result = r2dbcExecutor.select(s"select bucket counts [$minSlice - $maxSlice]")(
connection =>
connection
.createStatement(selectBucketsSql(minSlice, maxSlice))
.bind(0, entityType)
.bind(1, fromTimestamp)
.bind(2, toTimestamp)
.bind(3, limit),
row => {
val bucketStartEpochSeconds = row.get[java.lang.Long]("bucket", classOf[java.lang.Long]) * 10
val count = row.get[java.lang.Long]("count", classOf[java.lang.Long])
Bucket(bucketStartEpochSeconds, count)
})
if (log.isDebugEnabled)
result.foreach(rows =>
log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size: java.lang.Integer,
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer))
result
}