in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [354:391]
private def readPrimitiveOffset[Offset](): Future[Option[Offset]] = {
if (settings.isOffsetTableDefined) {
val singleOffsets = r2dbcExecutor.select("read offset")(
conn => {
logger.trace("reading offset for [{}]", projectionId)
conn
.createStatement(selectOffsetSql)
.bind(0, projectionId.name)
},
row => {
val offsetStr = row.get("current_offset", classOf[String])
val manifest = row.get("manifest", classOf[String])
val mergeable = row.get("mergeable", classOf[java.lang.Boolean])
val key = row.get("projection_key", classOf[String])
val adaptedProjectionId = ProjectionId(projectionId.name, key)
SingleOffset(adaptedProjectionId, manifest, offsetStr, mergeable)
})
singleOffsets.map { offsets =>
val result =
if (offsets.isEmpty) None
else if (offsets.forall(_.mergeable)) {
Some(
fromStorageRepresentation[MergeableOffset[_], Offset](MultipleOffsets(offsets.toList))
.asInstanceOf[Offset])
} else {
offsets.find(_.id == projectionId).map(fromStorageRepresentation[Offset, Offset])
}
logger.trace2("found offset [{}] for [{}]", result, projectionId)
result
}
} else {
Future.successful(None)
}
}