in core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala [314:372]
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = {
log.debug("Starting eventsByPersistenceId query for persistenceId [{}], from [{}].", persistenceId, fromSequenceNr)
def nextOffset(state: ByPersistenceIdState, row: SerializedJournalRow): ByPersistenceIdState =
state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr)
def delayNextQuery(state: ByPersistenceIdState): Option[FiniteDuration] = {
val delay = ContinuousQuery.adjustNextDelay(
state.rowCount,
settings.querySettings.bufferSize,
settings.querySettings.refreshInterval)
delay.foreach { d =>
log.debug(
"eventsByPersistenceId query [{}] for persistenceId [{}] delay next [{}] ms.",
state.queryCount: java.lang.Integer,
persistenceId,
d.toMillis: java.lang.Long)
}
delay
}
def nextQuery(
state: ByPersistenceIdState): (ByPersistenceIdState, Option[Source[SerializedJournalRow, NotUsed]]) = {
if (state.latestSeqNr >= toSequenceNr) {
log.debug(
"eventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.",
state.queryCount: java.lang.Integer,
persistenceId,
state.rowCount: java.lang.Integer)
state -> None
} else {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)
log.debug(
"eventsByPersistenceId query [{}] for persistenceId [{}], from [{}]. Found [{}] rows in previous query.",
newState.queryCount: java.lang.Integer,
persistenceId,
state.rowCount: java.lang.Integer)
newState ->
Some(
queryDao
.eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, toSequenceNr))
}
}
ContinuousQuery[ByPersistenceIdState, SerializedJournalRow](
initialState = ByPersistenceIdState(0, 0, latestSeqNr = fromSequenceNr - 1),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery)
.map(deserializeRow)
}