in core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala [47:78]
override def eventsBySlicesRangeSql(
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
def behindCurrentTimeIntervalCondition =
if (behindCurrentTime > Duration.Zero)
s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL '${behindCurrentTime.toMicros}' MICROSECOND)"
else ""
val selectColumns = {
if (backtracking)
s"SELECT slice, persistence_id, seq_nr, db_timestamp, $statementTimestampSql AS read_db_timestamp "
else
s"SELECT slice, persistence_id, seq_nr, db_timestamp, $statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
}
sql"""
$selectColumns
FROM $journalTable
WHERE entity_type = ?
AND slice BETWEEN $minSlice AND $maxSlice
AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
}