in core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala [60:89]
override def stateBySlicesRangeSql(
maxDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
def maxDbTimestampParamCondition =
if (maxDbTimestampParam) s"AND db_timestamp < ?" else ""
def behindCurrentTimeIntervalCondition =
if (behindCurrentTime > Duration.Zero)
s"AND db_timestamp < DATE_SUB(NOW(6), INTERVAL '${behindCurrentTime.toMicros}' MICROSECOND)"
else ""
val selectColumns =
if (backtracking)
"SELECT persistence_id, revision, db_timestamp, NOW(6) AS read_db_timestamp "
else
"SELECT persistence_id, revision, db_timestamp, NOW(6) AS read_db_timestamp, state_ser_id, state_ser_manifest, state_payload "
sql"""
$selectColumns
FROM $stateTable
WHERE entity_type = ?
AND slice BETWEEN $minSlice AND $maxSlice
AND db_timestamp >= ? $maxDbTimestampParamCondition $behindCurrentTimeIntervalCondition
ORDER BY db_timestamp, revision
LIMIT ?"""
}