in core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala [169:234]
def rowsBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedJournalRow, NotUsed] = {
val result = r2dbcExecutor.select(s"select eventsBySlices [$minSlice - $maxSlice]")(
connection => {
val stmt = connection
.createStatement(
eventsBySlicesRangeSql(
toDbTimestampParam = toTimestamp.isDefined,
behindCurrentTime,
backtracking,
minSlice,
maxSlice))
.bind(0, entityType)
.bind(1, fromTimestamp)
toTimestamp match {
case Some(until) =>
stmt.bind(2, until)
stmt.bind(3, settings.querySettings.bufferSize)
case None =>
stmt.bind(2, settings.querySettings.bufferSize)
}
stmt
},
row =>
if (backtracking)
SerializedJournalRow(
slice = row.get[Integer]("slice", classOf[Integer]),
entityType,
persistenceId = row.get("persistence_id", classOf[String]),
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = None, // lazy loaded for backtracking
serId = 0,
serManifest = "",
writerUuid = "", // not need in this query
tags = Set.empty, // tags not fetched in queries (yet)
metadata = None)
else
SerializedJournalRow(
slice = row.get[Integer]("slice", classOf[Integer]),
entityType,
persistenceId = row.get("persistence_id", classOf[String]),
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = Some(row.get("event_payload", classOf[Array[Byte]])),
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
tags = Set.empty, // tags not fetched in queries (yet)
metadata = readMetadata(row)))
if (log.isDebugEnabled)
result.foreach(rows =>
log.debug("Read [{}] events from slices [{} - {}]", rows.size: java.lang.Integer, minSlice: java.lang.Integer,
maxSlice: java.lang.Integer))
Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed)
}