in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala [287:419]
def liveBySlices(
logPrefix: String,
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)
if (log.isDebugEnabled())
log.debug(
"Starting {} query from slices [{} - {}], from time [{}].",
logPrefix,
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer,
initialOffset.timestamp)
def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")
state.copy(latestBacktracking = offset, rowCount = state.rowCount + 1)
} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")
state.copy(latest = offset, rowCount = state.rowCount + 1)
}
}
def delayNextQuery(state: QueryState): Option[FiniteDuration] = {
if (switchFromBacktracking(state)) {
// switch from from backtracking immediately
None
} else {
val delay = ContinuousQuery.adjustNextDelay(
state.rowCount,
settings.querySettings.bufferSize,
settings.querySettings.refreshInterval)
if (log.isDebugEnabled)
delay.foreach { d =>
log.debug(
"{} query [{}] from slices [{} - {}] delay next [{}] ms.",
logPrefix,
state.queryCount: java.lang.Long,
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer,
d.toMillis: java.lang.Long)
}
delay
}
}
def switchFromBacktracking(state: QueryState): Boolean = {
state.backtracking && state.rowCount < settings.querySettings.bufferSize - 1
}
def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
val newState =
if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero &&
(newIdleCount >= 5 || JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)) {
// FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0`
// switching to backtracking
val fromOffset =
if (state.latestBacktracking == TimestampOffset.Zero)
TimestampOffset.Zero.copy(timestamp = state.latest.timestamp.minus(firstBacktrackingQueryWindow))
else
state.latestBacktracking
state.copy(
rowCount = 0,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtracking = true,
latestBacktracking = fromOffset)
} else if (switchFromBacktracking(state)) {
// switch from backtracking
state.copy(rowCount = 0, queryCount = state.queryCount + 1, idleCount = newIdleCount, backtracking = false)
} else {
// continue
state.copy(rowCount = 0, queryCount = state.queryCount + 1, idleCount = newIdleCount)
}
val behindCurrentTime =
if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime
else settings.querySettings.behindCurrentTime
val fromTimestamp = newState.nextQueryFromTimestamp
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
if (log.isDebugEnabled())
log.debug(
"{} next query [{}]{} from slices [{} - {}], between time [{} - {}]. {}",
logPrefix,
newState.queryCount: java.lang.Long,
if (newState.backtracking) " in backtracking mode" else "",
minSlice: java.lang.Integer,
maxSlice: java.lang.Integer,
fromTimestamp,
toTimestamp.getOrElse(None),
if (newIdleCount >= 3) s"Idle in [$newIdleCount] queries."
else if (state.backtracking) s"Found [${state.rowCount}] rows in previous backtracking query."
else s"Found [${state.rowCount}] rows in previous query.")
newState ->
Some(
dao
.rowsBySlices(
entityType,
minSlice,
maxSlice,
fromTimestamp,
toTimestamp,
behindCurrentTime,
backtracking = newState.backtracking)
.via(deserializeAndAddOffset(newState.currentOffset)))
}
ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty.copy(latest = initialOffset),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _))
}