in core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala [247:295]
@InternalApi private[r2dbc] def internalEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
def updateState(state: ByPersistenceIdState, row: SerializedJournalRow): ByPersistenceIdState =
state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr)
def nextQuery(
state: ByPersistenceIdState,
highestSeqNr: Long): (ByPersistenceIdState, Option[Source[SerializedJournalRow, NotUsed]]) = {
if (state.queryCount == 0L || state.rowCount >= settings.querySettings.bufferSize) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)
if (state.queryCount != 0 && log.isDebugEnabled())
log.debug(
"currentEventsByPersistenceId query [{}] for persistenceId [{}], from [{}] to [{}]. Found [{}] rows in previous query.",
state.queryCount: java.lang.Integer,
persistenceId,
state.latestSeqNr + 1: java.lang.Long,
highestSeqNr: java.lang.Long,
state.rowCount: java.lang.Integer)
newState -> Some(
queryDao
.eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, highestSeqNr))
} else {
log.debug(
"currentEventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.",
state.queryCount: java.lang.Integer,
persistenceId,
state.rowCount: java.lang.Integer)
state -> None
}
}
if (log.isDebugEnabled())
log.debug(
"currentEventsByPersistenceId query for persistenceId [{}], from [{}] to [{}].",
persistenceId,
fromSequenceNr: java.lang.Long,
toSequenceNr: java.lang.Long)
ContinuousQuery[ByPersistenceIdState, SerializedJournalRow](
initialState = ByPersistenceIdState(0, 0, latestSeqNr = fromSequenceNr - 1),
updateState = updateState,
delayNextQuery = _ => None,
nextQuery = state => nextQuery(state, toSequenceNr))
}