in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala [164:229]
def saveOffsetBlocking[Offset](conn: Connection, projectionId: ProjectionId, offset: Offset): Done = {
if (verboseLogging)
logger.debug("saving offset [{}], using connection id [{}]", offset, System.identityHashCode(conn))
val now = Instant.now(clock).toEpochMilli
val storageReps = toStorageRepresentation(projectionId, offset)
// Statement.EXECUTE_FAILED (-3) means statement failed
// -2 means successful, but there is no information about it (that's driver dependent).
// 0 means nothing inserted or updated,
// any positive number indicates the num of rows affected.
// What a mess!!
def failedStatement(i: Int) = i == 0 || i == Statement.EXECUTE_FAILED
def insertOrUpdate(singleOffset: SingleOffset): Unit = {
val tryUpdateResult =
tryWithResource(conn.prepareStatement(settings.dialect.updateStatement())) { stmt =>
// SET
stmt.setString(UpdateIndices.OFFSET, singleOffset.offsetStr)
stmt.setString(UpdateIndices.MANIFEST, singleOffset.manifest)
stmt.setBoolean(UpdateIndices.MERGEABLE, singleOffset.mergeable)
stmt.setLong(UpdateIndices.LAST_UPDATED, now)
// WHERE
stmt.setString(UpdateIndices.PROJECTION_NAME, singleOffset.id.name)
stmt.setString(UpdateIndices.PROJECTION_KEY, singleOffset.id.key)
stmt.executeUpdate()
}
if (verboseLogging) {
logger.debug2("tried to update offset [{}], statement result [{}]", offset, tryUpdateResult)
}
if (failedStatement(tryUpdateResult)) {
tryWithResource(conn.prepareStatement(settings.dialect.insertStatement())) { stmt =>
// VALUES
stmt.setString(InsertIndices.PROJECTION_NAME, singleOffset.id.name)
stmt.setString(InsertIndices.PROJECTION_KEY, singleOffset.id.key)
stmt.setString(InsertIndices.OFFSET, singleOffset.offsetStr)
stmt.setString(InsertIndices.MANIFEST, singleOffset.manifest)
stmt.setBoolean(InsertIndices.MERGEABLE, singleOffset.mergeable)
stmt.setLong(InsertIndices.LAST_UPDATED, now)
val triedInsertResult = stmt.executeUpdate()
if (verboseLogging)
logger.debug2("tried to insert offset [{}], batch result [{}]", offset, triedInsertResult)
// did we get any failure on inserts?!
if (failedStatement(triedInsertResult)) {
throw new RuntimeException(s"Failed to insert offset [$singleOffset]")
}
}
}
}
storageReps match {
case single: SingleOffset => insertOrUpdate(single)
case MultipleOffsets(many) => many.foreach(insertOrUpdate)
}
Done
}