in core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala [292:346]
def loadEvent(persistenceId: String, seqNr: Long): Future[Option[SerializedJournalRow]] =
r2dbcExecutor.selectOne("select one event")(
connection =>
connection
.createStatement(selectOneEventSql)
.bind(0, persistenceId)
.bind(1, seqNr),
row =>
SerializedJournalRow(
slice = row.get[Integer]("slice", classOf[Integer]),
entityType = row.get("entity_type", classOf[String]),
persistenceId,
seqNr,
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = Some(row.get("event_payload", classOf[Array[Byte]])),
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
tags = Set.empty, // tags not fetched in queries (yet)
metadata = readMetadata(row)))
def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
val result = r2dbcExecutor.select(s"select eventsByPersistenceId [$persistenceId]")(
connection =>
connection
.createStatement(selectEventsSql)
.bind(0, persistenceId)
.bind(1, fromSequenceNr)
.bind(2, toSequenceNr)
.bind(3, settings.querySettings.bufferSize),
row =>
SerializedJournalRow(
slice = row.get[Integer]("slice", classOf[Integer]),
entityType = row.get("entity_type", classOf[String]),
persistenceId = row.get("persistence_id", classOf[String]),
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = Some(row.get("event_payload", classOf[Array[Byte]])),
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = row.get("writer", classOf[String]),
tags = Set.empty, // tags not fetched in queries (yet)
metadata = readMetadata(row)))
if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] events for persistenceId [{}]", rows.size, persistenceId))
Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed)
}