in core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala [340:400]
override def rowsBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedStateRow, NotUsed] = {
val result = r2dbcExecutor.select(s"select stateBySlices [$minSlice - $maxSlice]")(
connection => {
val stmt = connection
.createStatement(
stateBySlicesRangeSql(
maxDbTimestampParam = toTimestamp.isDefined,
behindCurrentTime,
backtracking,
minSlice,
maxSlice))
.bind(0, entityType)
.bind(1, fromTimestamp)
toTimestamp match {
case Some(until) =>
stmt.bind(2, until)
stmt.bind(3, settings.querySettings.bufferSize)
case None =>
stmt.bind(2, settings.querySettings.bufferSize)
}
stmt
},
row =>
if (backtracking)
SerializedStateRow(
persistenceId = row.get("persistence_id", classOf[String]),
revision = row.get[java.lang.Long]("revision", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = null, // lazy loaded for backtracking
serId = 0,
serManifest = "",
tags = Set.empty // tags not fetched in queries (yet)
)
else
SerializedStateRow(
persistenceId = row.get("persistence_id", classOf[String]),
revision = row.get[java.lang.Long]("revision", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = row.get("state_payload", classOf[Array[Byte]]),
serId = row.get[Integer]("state_ser_id", classOf[Integer]),
serManifest = row.get("state_ser_manifest", classOf[String]),
tags = Set.empty // tags not fetched in queries (yet)
))
if (log.isDebugEnabled)
result.foreach(rows =>
log.debug("Read [{}] durable states from slices [{} - {}]", rows.size: java.lang.Integer,
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer))
Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed)
}