in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala [108:149]
def readOffset[Offset](projectionId: ProjectionId): Future[Option[Offset]] =
withConnection(jdbcSessionFactory) { conn =>
if (verboseLogging)
logger.debug("reading offset for [{}], using connection id [{}]", projectionId, System.identityHashCode(conn))
// init Statement in try-with-resource
tryWithResource(conn.prepareStatement(settings.dialect.readOffsetQuery)) { stmt =>
stmt.setString(1, projectionId.name)
// init ResultSet in try-with-resource
tryWithResource(stmt.executeQuery()) { resultSet =>
val buffer = ListBuffer.empty[SingleOffset]
while (resultSet.next()) {
val offsetStr = resultSet.getString("CURRENT_OFFSET")
val manifest = resultSet.getString("MANIFEST")
val mergeable = resultSet.getBoolean("MERGEABLE")
val key = resultSet.getString("PROJECTION_KEY")
val adaptedProjectionId = ProjectionId(projectionId.name, key)
val single = SingleOffset(adaptedProjectionId, manifest, offsetStr, mergeable)
buffer.append(single)
}
val result =
if (buffer.isEmpty) None
else if (buffer.forall(_.mergeable)) {
Some(
fromStorageRepresentation[MergeableOffset[_], Offset](MultipleOffsets(buffer.toList))
.asInstanceOf[Offset])
} else {
buffer.find(_.id == projectionId).map(fromStorageRepresentation[Offset, Offset])
}
if (verboseLogging) logger.debug2("found offset [{}] for [{}]", result, projectionId)
result
}
}
}