in core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala [402:441]
override def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] =
queryDao.persistenceIds(afterId, limit)
override def currentPersistenceIds(): Source[String, NotUsed] = {
import settings.querySettings.persistenceIdsBufferSize
def updateState(state: PersistenceIdsQueryState, pid: String): PersistenceIdsQueryState =
state.copy(rowCount = state.rowCount + 1, latestPid = pid)
def nextQuery(state: PersistenceIdsQueryState): (PersistenceIdsQueryState, Option[Source[String, NotUsed]]) = {
if (state.queryCount == 0L || state.rowCount >= persistenceIdsBufferSize) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)
if (state.queryCount != 0 && log.isDebugEnabled())
log.debug(
"persistenceIds query [{}] after [{}]. Found [{}] rows in previous query.",
state.queryCount: java.lang.Integer,
state.latestPid,
state.rowCount: java.lang.Integer)
newState -> Some(
queryDao
.persistenceIds(if (state.latestPid == "") None else Some(state.latestPid), persistenceIdsBufferSize))
} else {
if (log.isDebugEnabled)
log.debug(
"persistenceIds query [{}] completed. Found [{}] rows in previous query.",
state.queryCount,
state.rowCount)
state -> None
}
}
ContinuousQuery[PersistenceIdsQueryState, String](
initialState = PersistenceIdsQueryState(0, 0, ""),
updateState = updateState,
delayNextQuery = _ => None,
nextQuery = state => nextQuery(state))
.mapMaterializedValue(_ => NotUsed)
}