private def readPrimitiveOffset[Offset]()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [354:391]


  private def readPrimitiveOffset[Offset](): Future[Option[Offset]] = {
    if (settings.isOffsetTableDefined) {
      val singleOffsets = r2dbcExecutor.select("read offset")(
        conn => {
          logger.trace("reading offset for [{}]", projectionId)
          conn
            .createStatement(selectOffsetSql)
            .bind(0, projectionId.name)
        },
        row => {
          val offsetStr = row.get("current_offset", classOf[String])
          val manifest = row.get("manifest", classOf[String])
          val mergeable = row.get("mergeable", classOf[java.lang.Boolean])
          val key = row.get("projection_key", classOf[String])

          val adaptedProjectionId = ProjectionId(projectionId.name, key)
          SingleOffset(adaptedProjectionId, manifest, offsetStr, mergeable)
        })

      singleOffsets.map { offsets =>
        val result =
          if (offsets.isEmpty) None
          else if (offsets.forall(_.mergeable)) {
            Some(
              fromStorageRepresentation[MergeableOffset[_], Offset](MultipleOffsets(offsets.toList))
                .asInstanceOf[Offset])
          } else {
            offsets.find(_.id == projectionId).map(fromStorageRepresentation[Offset, Offset])
          }

        logger.trace2("found offset [{}] for [{}]", result, projectionId)

        result
      }
    } else {
      Future.successful(None)
    }
  }