in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [821:866]
def deleteOldTimestampOffsets(): Future[Long] = {
if (idle.getAndSet(true)) {
// no new offsets stored since previous delete
Future.successful(0)
} else {
val currentState = getState()
if (currentState.size <= settings.keepNumberOfEntries || currentState.window.compareTo(settings.timeWindow) < 0) {
// it hasn't filled up the window yet
Future.successful(0)
} else {
val until = currentState.latestTimestamp.minus(settings.timeWindow)
val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
val result = r2dbcExecutor.updateOne("delete old timestamp offset") { conn =>
conn
.createStatement(deleteOldTimestampOffsetSql)
.bind(0, minSlice)
.bind(1, maxSlice)
.bind(2, projectionId.name)
.bind(3, until)
}
// FIXME would it be good to keep at least one record per slice that can be used as the
// starting point for the slice if the slice ranges are changed?
result.failed.foreach { exc =>
idle.set(false) // try again next tick
logger.warn(
"Failed to delete timestamp offset until [{}] for projection [{}]: {}",
until,
projectionId.id,
exc.toString)
}
if (logger.isDebugEnabled)
result.foreach { rows =>
logger.debug(
"Deleted [{}] timestamp offset rows until [{}] for projection [{}].",
rows: java.lang.Long,
until,
projectionId.id)
}
result
}
}
}