in core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala [141:170]
private def stateBySlicesRangeSql(
maxDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
def maxDbTimestampParamCondition =
if (maxDbTimestampParam) s"AND db_timestamp < ?" else ""
def behindCurrentTimeIntervalCondition =
if (behindCurrentTime > Duration.Zero)
s"AND db_timestamp < transaction_timestamp() - interval '${behindCurrentTime.toMillis} milliseconds'"
else ""
val selectColumns =
if (backtracking)
"SELECT persistence_id, revision, db_timestamp, statement_timestamp() AS read_db_timestamp "
else
"SELECT persistence_id, revision, db_timestamp, statement_timestamp() AS read_db_timestamp, state_ser_id, state_ser_manifest, state_payload "
sql"""
$selectColumns
FROM $stateTable
WHERE entity_type = ?
AND ${sliceCondition(minSlice, maxSlice)}
AND db_timestamp >= ? $maxDbTimestampParamCondition $behindCurrentTimeIntervalCondition
ORDER BY db_timestamp, revision
LIMIT ?"""
}