in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala [158:208]
def currentDbTimestamp(): Future[Instant]
def rowsBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedRow, NotUsed]
/**
* For Durable State we always refresh the bucket counts at the interval. For Event Sourced we know that they don't
* change because events are append only.
*/
def countBucketsMayChange: Boolean
def countBuckets(
entityType: String,
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]]
}
}
/**
* INTERNAL API
*/
@InternalApi private[r2dbc] class BySliceQuery[Row <: BySliceQuery.SerializedRow, Envelope](
dao: BySliceQuery.Dao[Row],
createEnvelope: (TimestampOffset, Row) => Envelope,
extractOffset: Envelope => TimestampOffset,
settings: R2dbcSettings,
log: Logger)(implicit val ec: ExecutionContext) {
import BySliceQuery._
import TimestampOffset.toTimestampOffset
private val backtrackingWindow = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)
private val halfBacktrackingWindow = backtrackingWindow.dividedBy(2)
private val firstBacktrackingQueryWindow =
backtrackingWindow.plus(JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis))
private val eventBucketCountInterval = JDuration.ofSeconds(60)
def currentBySlices(
logPrefix: String,
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[Envelope, NotUsed] = {