def liveBySlices()

in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala [287:419]


  def liveBySlices(
      logPrefix: String,
      entityType: String,
      minSlice: Int,
      maxSlice: Int,
      offset: Offset): Source[Envelope, NotUsed] = {
    val initialOffset = toTimestampOffset(offset)

    if (log.isDebugEnabled())
      log.debug(
        "Starting {} query from slices [{} - {}], from time [{}].",
        logPrefix,
        minSlice: java.lang.Integer,
        maxSlice: java.lang.Integer,
        initialOffset.timestamp)

    def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
      val offset = extractOffset(envelope)
      if (state.backtracking) {
        if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
          throw new IllegalArgumentException(
            s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

        state.copy(latestBacktracking = offset, rowCount = state.rowCount + 1)
      } else {
        if (offset.timestamp.isBefore(state.latest.timestamp))
          throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")

        state.copy(latest = offset, rowCount = state.rowCount + 1)
      }
    }

    def delayNextQuery(state: QueryState): Option[FiniteDuration] = {
      if (switchFromBacktracking(state)) {
        // switch from from backtracking immediately
        None
      } else {
        val delay = ContinuousQuery.adjustNextDelay(
          state.rowCount,
          settings.querySettings.bufferSize,
          settings.querySettings.refreshInterval)

        if (log.isDebugEnabled)
          delay.foreach { d =>
            log.debug(
              "{} query [{}] from slices [{} - {}] delay next [{}] ms.",
              logPrefix,
              state.queryCount: java.lang.Long,
              minSlice: java.lang.Integer,
              maxSlice: java.lang.Integer,
              d.toMillis: java.lang.Long)
          }

        delay
      }
    }

    def switchFromBacktracking(state: QueryState): Boolean = {
      state.backtracking && state.rowCount < settings.querySettings.bufferSize - 1
    }

    def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
      val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
      val newState =
        if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero &&
          (newIdleCount >= 5 || JDuration
            .between(state.latestBacktracking.timestamp, state.latest.timestamp)
            .compareTo(halfBacktrackingWindow) > 0)) {
          // FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0`

          // switching to backtracking
          val fromOffset =
            if (state.latestBacktracking == TimestampOffset.Zero)
              TimestampOffset.Zero.copy(timestamp = state.latest.timestamp.minus(firstBacktrackingQueryWindow))
            else
              state.latestBacktracking

          state.copy(
            rowCount = 0,
            queryCount = state.queryCount + 1,
            idleCount = newIdleCount,
            backtracking = true,
            latestBacktracking = fromOffset)
        } else if (switchFromBacktracking(state)) {
          // switch from backtracking
          state.copy(rowCount = 0, queryCount = state.queryCount + 1, idleCount = newIdleCount, backtracking = false)
        } else {
          // continue
          state.copy(rowCount = 0, queryCount = state.queryCount + 1, idleCount = newIdleCount)
        }

      val behindCurrentTime =
        if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime
        else settings.querySettings.behindCurrentTime

      val fromTimestamp = newState.nextQueryFromTimestamp
      val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)

      if (log.isDebugEnabled())
        log.debug(
          "{} next query [{}]{} from slices [{} - {}], between time [{} - {}]. {}",
          logPrefix,
          newState.queryCount: java.lang.Long,
          if (newState.backtracking) " in backtracking mode" else "",
          minSlice: java.lang.Integer,
          maxSlice: java.lang.Integer,
          fromTimestamp,
          toTimestamp.getOrElse(None),
          if (newIdleCount >= 3) s"Idle in [$newIdleCount] queries."
          else if (state.backtracking) s"Found [${state.rowCount}] rows in previous backtracking query."
          else s"Found [${state.rowCount}] rows in previous query.")

      newState ->
      Some(
        dao
          .rowsBySlices(
            entityType,
            minSlice,
            maxSlice,
            fromTimestamp,
            toTimestamp,
            behindCurrentTime,
            backtracking = newState.backtracking)
          .via(deserializeAndAddOffset(newState.currentOffset)))
    }

    ContinuousQuery[QueryState, Envelope](
      initialState = QueryState.empty.copy(latest = initialOffset),
      updateState = nextOffset,
      delayNextQuery = delayNextQuery,
      nextQuery = nextQuery,
      beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _))
  }