in core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala [80:111]
protected def eventsBySlicesRangeSql(
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
def behindCurrentTimeIntervalCondition =
if (behindCurrentTime > Duration.Zero)
s"AND db_timestamp < transaction_timestamp() - interval '${behindCurrentTime.toMillis} milliseconds'"
else ""
val selectColumns = {
if (backtracking)
"SELECT slice, persistence_id, seq_nr, db_timestamp, statement_timestamp() AS read_db_timestamp "
else
"SELECT slice, persistence_id, seq_nr, db_timestamp, statement_timestamp() AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
}
sql"""
$selectColumns
FROM $journalTable
WHERE entity_type = ?
AND ${sliceCondition(minSlice, maxSlice)}
AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
}