in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala [261:330]
def savePaused(projectionId: ProjectionId, paused: Boolean): Future[Done] = {
withConnection(jdbcSessionFactory) { conn =>
if (verboseLogging)
logger.debugN(
"saving paused [{}] for [{}], using connection id [{}]",
paused,
projectionId,
System.identityHashCode(conn))
val now = Instant.now(clock).toEpochMilli
// Statement.EXECUTE_FAILED (-3) means statement failed
// -2 means successful, but there is no information about it (that's driver dependent).
// 0 means nothing inserted or updated,
// any positive number indicates the num of rows affected.
// What a mess!!
def failedStatement(i: Int) = i == 0 || i == Statement.EXECUTE_FAILED
def insertOrUpdate(): Unit = {
val tryUpdateResult =
tryWithResource(conn.prepareStatement(settings.dialect.updateManagementStatement())) { stmt =>
// SET
stmt.setBoolean(UpdateManagementIndices.PAUSED, paused)
stmt.setLong(UpdateManagementIndices.LAST_UPDATED, now)
// WHERE
stmt.setString(UpdateManagementIndices.PROJECTION_NAME, projectionId.name)
stmt.setString(UpdateManagementIndices.PROJECTION_KEY, projectionId.key)
stmt.executeUpdate()
}
if (verboseLogging) {
logger.debugN(
s"tried to update paused [{}] for [{}], statement result [{}]",
paused,
projectionId,
tryUpdateResult)
}
if (failedStatement(tryUpdateResult)) {
tryWithResource(conn.prepareStatement(settings.dialect.insertManagementStatement())) { stmt =>
// VALUES
stmt.setString(InsertManagementIndices.PROJECTION_NAME, projectionId.name)
stmt.setString(InsertManagementIndices.PROJECTION_KEY, projectionId.key)
stmt.setBoolean(InsertManagementIndices.PAUSED, paused)
stmt.setLong(InsertManagementIndices.LAST_UPDATED, now)
val triedInsertResult = stmt.executeUpdate()
if (verboseLogging)
logger.debugN(
"tried to insert paused [{}] for [{}], batch result [{}]",
paused,
projectionId,
triedInsertResult)
// did we get any failure on inserts?!
if (failedStatement(triedInsertResult)) {
throw new RuntimeException(s"Failed to insert paused [$paused] for [$projectionId]")
}
}
}
}
insertOrUpdate()
Done
}
}