in slick/src/main/scala/org/apache/pekko/projection/slick/internal/SlickOffsetStore.scala [105:117]
private def newRow[Offset](rep: SingleOffset, millisSinceEpoch: Long): DBIO[_] =
offsetTable.insertOrUpdate(
OffsetRow(rep.id.name, rep.id.key, rep.offsetStr, rep.manifest, rep.mergeable, millisSinceEpoch))
def saveOffset[Offset](projectionId: ProjectionId, offset: Offset): slick.dbio.DBIO[_] = {
val millisSinceEpoch = clock.instant().toEpochMilli
toStorageRepresentation(projectionId, offset) match {
case offset: SingleOffset => newRow(offset, millisSinceEpoch)
case MultipleOffsets(reps) =>
val actions = reps.map(rep => newRow(rep, millisSinceEpoch))
DBIO.sequence(actions)
}
}