in core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala [114:179]
override def deleteObject(persistenceId: String): Future[Done] =
stateDao.deleteState(persistenceId)
override def deleteObject(persistenceId: String, revision: Long): Future[Done] =
stateDao.deleteState(persistenceId)
override def sliceForPersistenceId(persistenceId: String): Int =
persistenceExt.sliceForPersistenceId(persistenceId)
override def sliceRanges(numberOfRanges: Int): immutable.Seq[Range] =
persistenceExt.sliceRanges(numberOfRanges)
override def currentChangesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed] =
bySlice.currentBySlices("currentChangesBySlices", entityType, minSlice, maxSlice, offset)
override def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed] =
bySlice.liveBySlices("changesBySlices", entityType, minSlice, maxSlice, offset)
override def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] =
stateDao.persistenceIds(afterId, limit)
def currentPersistenceIds(): Source[String, NotUsed] = {
import settings.querySettings.persistenceIdsBufferSize
def updateState(state: PersistenceIdsQueryState, pid: String): PersistenceIdsQueryState =
state.copy(rowCount = state.rowCount + 1, latestPid = pid)
def nextQuery(state: PersistenceIdsQueryState): (PersistenceIdsQueryState, Option[Source[String, NotUsed]]) = {
if (state.queryCount == 0L || state.rowCount >= persistenceIdsBufferSize) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)
if (state.queryCount != 0 && log.isDebugEnabled())
log.debug(
"persistenceIds query [{}] after [{}]. Found [{}] rows in previous query.",
state.queryCount: java.lang.Integer,
state.latestPid,
state.rowCount: java.lang.Integer)
newState -> Some(
stateDao
.persistenceIds(if (state.latestPid == "") None else Some(state.latestPid), persistenceIdsBufferSize))
} else {
if (log.isDebugEnabled)
log.debug(
"persistenceIds query [{}] completed. Found [{}] rows in previous query.",
state.queryCount,
state.rowCount)
state -> None
}
}
ContinuousQuery[PersistenceIdsQueryState, String](
initialState = PersistenceIdsQueryState(0, 0, ""),
updateState = updateState,
delayNextQuery = _ => None,
nextQuery = state => nextQuery(state))
.mapMaterializedValue(_ => NotUsed)
}