def deleteOldTimestampOffsets()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [821:866]


  def deleteOldTimestampOffsets(): Future[Long] = {
    if (idle.getAndSet(true)) {
      // no new offsets stored since previous delete
      Future.successful(0)
    } else {
      val currentState = getState()
      if (currentState.size <= settings.keepNumberOfEntries || currentState.window.compareTo(settings.timeWindow) < 0) {
        // it hasn't filled up the window yet
        Future.successful(0)
      } else {
        val until = currentState.latestTimestamp.minus(settings.timeWindow)
        val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
        val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
        val result = r2dbcExecutor.updateOne("delete old timestamp offset") { conn =>
          conn
            .createStatement(deleteOldTimestampOffsetSql)
            .bind(0, minSlice)
            .bind(1, maxSlice)
            .bind(2, projectionId.name)
            .bind(3, until)
        }

        // FIXME would it be good to keep at least one record per slice that can be used as the
        // starting point for the slice if the slice ranges are changed?

        result.failed.foreach { exc =>
          idle.set(false) // try again next tick
          logger.warn(
            "Failed to delete timestamp offset until [{}] for projection [{}]: {}",
            until,
            projectionId.id,
            exc.toString)
        }
        if (logger.isDebugEnabled)
          result.foreach { rows =>
            logger.debug(
              "Deleted [{}] timestamp offset rows until [{}] for projection [{}].",
              rows: java.lang.Long,
              until,
              projectionId.id)
          }

        result
      }
    }
  }