def readOffset[Offset]()

in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcOffsetStore.scala [108:149]


  def readOffset[Offset](projectionId: ProjectionId): Future[Option[Offset]] =
    withConnection(jdbcSessionFactory) { conn =>
      if (verboseLogging)
        logger.debug("reading offset for [{}], using connection id [{}]", projectionId, System.identityHashCode(conn))

      // init Statement in try-with-resource
      tryWithResource(conn.prepareStatement(settings.dialect.readOffsetQuery)) { stmt =>
        stmt.setString(1, projectionId.name)

        // init ResultSet in try-with-resource
        tryWithResource(stmt.executeQuery()) { resultSet =>
          val buffer = ListBuffer.empty[SingleOffset]

          while (resultSet.next()) {

            val offsetStr = resultSet.getString("CURRENT_OFFSET")
            val manifest = resultSet.getString("MANIFEST")
            val mergeable = resultSet.getBoolean("MERGEABLE")
            val key = resultSet.getString("PROJECTION_KEY")

            val adaptedProjectionId = ProjectionId(projectionId.name, key)
            val single = SingleOffset(adaptedProjectionId, manifest, offsetStr, mergeable)
            buffer.append(single)
          }

          val result =
            if (buffer.isEmpty) None
            else if (buffer.forall(_.mergeable)) {
              Some(
                fromStorageRepresentation[MergeableOffset[_], Offset](MultipleOffsets(buffer.toList))
                  .asInstanceOf[Offset])
            } else {
              buffer.find(_.id == projectionId).map(fromStorageRepresentation[Offset, Offset])
            }

          if (verboseLogging) logger.debug2("found offset [{}] for [{}]", result, projectionId)

          result
        }

      }
    }