in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala [211:262]
def nextOffset(state: QueryState, envelope: Envelope): QueryState =
state.copy(latest = extractOffset(envelope), rowCount = state.rowCount + 1)
def nextQuery(state: QueryState, endTimestamp: Instant): (QueryState, Option[Source[Envelope, NotUsed]]) = {
// Note that we can't know how many events with the same timestamp that are filtered out
// so continue until rowCount is 0. That means an extra query at the end to make sure there are no
// more to fetch.
if (state.queryCount == 0L || state.rowCount > 0) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
case None =>
endTimestamp
}
if (state.queryCount != 0 && log.isDebugEnabled())
log.debug(
"{} next query [{}] from slices [{} - {}], between time [{} - {}]. Found [{}] rows in previous query.",
logPrefix,
state.queryCount: java.lang.Long,
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer,
state.latest.timestamp,
toTimestamp,
state.rowCount: java.lang.Integer)
newState -> Some(
dao
.rowsBySlices(
entityType,
minSlice,
maxSlice,
state.latest.timestamp,
toTimestamp = Some(toTimestamp),
behindCurrentTime = Duration.Zero,
backtracking = false)
.via(deserializeAndAddOffset(state.latest)))
} else {
if (log.isDebugEnabled)
log.debug(
"{} query [{}] from slices [{} - {}] completed. Found [{}] rows in previous query.",
logPrefix,
state.queryCount: java.lang.Long,
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer,
state.rowCount: java.lang.Integer)
state -> None
}
}