def savePaused()

in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala [261:330]


  def savePaused(projectionId: ProjectionId, paused: Boolean): Future[Done] = {
    withConnection(jdbcSessionFactory) { conn =>
      if (verboseLogging)
        logger.debugN(
          "saving paused [{}] for [{}], using connection id [{}]",
          paused,
          projectionId,
          System.identityHashCode(conn))

      val now = Instant.now(clock).toEpochMilli

      // Statement.EXECUTE_FAILED  (-3) means statement failed
      // -2 means successful, but there is no information about it (that's driver dependent).
      // 0 means nothing inserted or updated,
      // any positive number indicates the num of rows affected.
      // What a mess!!
      def failedStatement(i: Int) = i == 0 || i == Statement.EXECUTE_FAILED

      def insertOrUpdate(): Unit = {
        val tryUpdateResult =
          tryWithResource(conn.prepareStatement(settings.dialect.updateManagementStatement())) { stmt =>
            // SET
            stmt.setBoolean(UpdateManagementIndices.PAUSED, paused)
            stmt.setLong(UpdateManagementIndices.LAST_UPDATED, now)
            // WHERE
            stmt.setString(UpdateManagementIndices.PROJECTION_NAME, projectionId.name)
            stmt.setString(UpdateManagementIndices.PROJECTION_KEY, projectionId.key)

            stmt.executeUpdate()
          }

        if (verboseLogging) {
          logger.debugN(
            s"tried to update paused [{}] for [{}], statement result [{}]",
            paused,
            projectionId,
            tryUpdateResult)
        }

        if (failedStatement(tryUpdateResult)) {
          tryWithResource(conn.prepareStatement(settings.dialect.insertManagementStatement())) { stmt =>
            // VALUES
            stmt.setString(InsertManagementIndices.PROJECTION_NAME, projectionId.name)
            stmt.setString(InsertManagementIndices.PROJECTION_KEY, projectionId.key)
            stmt.setBoolean(InsertManagementIndices.PAUSED, paused)
            stmt.setLong(InsertManagementIndices.LAST_UPDATED, now)

            val triedInsertResult = stmt.executeUpdate()

            if (verboseLogging)
              logger.debugN(
                "tried to insert paused [{}] for [{}], batch result [{}]",
                paused,
                projectionId,
                triedInsertResult)

            // did we get any failure on inserts?!
            if (failedStatement(triedInsertResult)) {
              throw new RuntimeException(s"Failed to insert paused [$paused] for [$projectionId]")
            }
          }
        }

      }

      insertOrUpdate()

      Done
    }
  }