in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala [421:467]
private def beforeQuery(
logPrefix: String,
entityType: String,
minSlice: Int,
maxSlice: Int,
state: QueryState): Option[Future[QueryState]] = {
// Don't run this too frequently
if ((state.buckets.isEmpty || JDuration
.between(state.buckets.createdAt, Instant.now())
.compareTo(eventBucketCountInterval) > 0) &&
// For Durable State we always refresh the bucket counts at the interval. For Event Sourced we know
// that they don't change because events are append only.
(dao.countBucketsMayChange || state.buckets
.findTimeForLimit(state.latest.timestamp, settings.querySettings.bufferSize)
.isEmpty)) {
val fromTimestamp =
if (state.latestBacktracking.timestamp == Instant.EPOCH && state.latest.timestamp == Instant.EPOCH)
Instant.EPOCH
else if (state.latestBacktracking.timestamp == Instant.EPOCH)
state.latest.timestamp.minus(firstBacktrackingQueryWindow)
else
state.latestBacktracking.timestamp
val futureState =
dao.countBuckets(entityType, minSlice, maxSlice, fromTimestamp, Buckets.Limit).map { counts =>
val newBuckets = state.buckets.clearUntil(fromTimestamp).add(counts)
val newState = state.copy(buckets = newBuckets)
if (log.isDebugEnabled) {
val sum = counts.iterator.map { case Bucket(_, count) => count }.sum
log.debug(
"{} retrieved [{}] event count buckets, with a total of [{}], from slices [{} - {}], from time [{}]",
logPrefix,
counts.size: java.lang.Integer,
sum: java.lang.Long,
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer,
fromTimestamp)
}
newState
}
Some(futureState)
} else {
// already enough buckets or retrieved recently
None
}
}