def saveOffsetBlocking[Offset]()

in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala [164:229]


  def saveOffsetBlocking[Offset](conn: Connection, projectionId: ProjectionId, offset: Offset): Done = {

    if (verboseLogging)
      logger.debug("saving offset [{}], using connection id [{}]", offset, System.identityHashCode(conn))

    val now = Instant.now(clock).toEpochMilli

    val storageReps = toStorageRepresentation(projectionId, offset)

    // Statement.EXECUTE_FAILED  (-3) means statement failed
    // -2 means successful, but there is no information about it (that's driver dependent).
    // 0 means nothing inserted or updated,
    // any positive number indicates the num of rows affected.
    // What a mess!!
    def failedStatement(i: Int) = i == 0 || i == Statement.EXECUTE_FAILED

    def insertOrUpdate(singleOffset: SingleOffset): Unit = {
      val tryUpdateResult =
        tryWithResource(conn.prepareStatement(settings.dialect.updateStatement())) { stmt =>
          // SET
          stmt.setString(UpdateIndices.OFFSET, singleOffset.offsetStr)
          stmt.setString(UpdateIndices.MANIFEST, singleOffset.manifest)
          stmt.setBoolean(UpdateIndices.MERGEABLE, singleOffset.mergeable)
          stmt.setLong(UpdateIndices.LAST_UPDATED, now)
          // WHERE
          stmt.setString(UpdateIndices.PROJECTION_NAME, singleOffset.id.name)
          stmt.setString(UpdateIndices.PROJECTION_KEY, singleOffset.id.key)

          stmt.executeUpdate()
        }

      if (verboseLogging) {
        logger.debug2("tried to update offset [{}], statement result [{}]", offset, tryUpdateResult)
      }

      if (failedStatement(tryUpdateResult)) {
        tryWithResource(conn.prepareStatement(settings.dialect.insertStatement())) { stmt =>
          // VALUES
          stmt.setString(InsertIndices.PROJECTION_NAME, singleOffset.id.name)
          stmt.setString(InsertIndices.PROJECTION_KEY, singleOffset.id.key)
          stmt.setString(InsertIndices.OFFSET, singleOffset.offsetStr)
          stmt.setString(InsertIndices.MANIFEST, singleOffset.manifest)
          stmt.setBoolean(InsertIndices.MERGEABLE, singleOffset.mergeable)
          stmt.setLong(InsertIndices.LAST_UPDATED, now)

          val triedInsertResult = stmt.executeUpdate()

          if (verboseLogging)
            logger.debug2("tried to insert offset [{}], batch result [{}]", offset, triedInsertResult)

          // did we get any failure on inserts?!
          if (failedStatement(triedInsertResult)) {
            throw new RuntimeException(s"Failed to insert offset [$singleOffset]")
          }
        }
      }

    }

    storageReps match {
      case single: SingleOffset  => insertOrUpdate(single)
      case MultipleOffsets(many) => many.foreach(insertOrUpdate)
    }

    Done
  }