def nextOffset()

in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala [211:262]


    def nextOffset(state: QueryState, envelope: Envelope): QueryState =
      state.copy(latest = extractOffset(envelope), rowCount = state.rowCount + 1)

    def nextQuery(state: QueryState, endTimestamp: Instant): (QueryState, Option[Source[Envelope, NotUsed]]) = {
      // Note that we can't know how many events with the same timestamp that are filtered out
      // so continue until rowCount is 0. That means an extra query at the end to make sure there are no
      // more to fetch.
      if (state.queryCount == 0L || state.rowCount > 0) {
        val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)

        val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
          case Some(t) =>
            if (t.isBefore(endTimestamp)) t else endTimestamp
          case None =>
            endTimestamp
        }

        if (state.queryCount != 0 && log.isDebugEnabled())
          log.debug(
            "{} next query [{}] from slices [{} - {}], between time [{} - {}]. Found [{}] rows in previous query.",
            logPrefix,
            state.queryCount: java.lang.Long,
            minSlice: java.lang.Integer,
            maxSlice: java.lang.Integer,
            state.latest.timestamp,
            toTimestamp,
            state.rowCount: java.lang.Integer)

        newState -> Some(
          dao
            .rowsBySlices(
              entityType,
              minSlice,
              maxSlice,
              state.latest.timestamp,
              toTimestamp = Some(toTimestamp),
              behindCurrentTime = Duration.Zero,
              backtracking = false)
            .via(deserializeAndAddOffset(state.latest)))
      } else {
        if (log.isDebugEnabled)
          log.debug(
            "{} query [{}] from slices [{} - {}] completed. Found [{}] rows in previous query.",
            logPrefix,
            state.queryCount: java.lang.Long,
            minSlice: java.lang.Integer,
            maxSlice: java.lang.Integer,
            state.rowCount: java.lang.Integer)

        state -> None
      }
    }