app/data/Dynamo.scala (156 lines of code) (raw):
package data
import org.scanamo.{DynamoFormat, Scanamo, Table => ScanamoTable}
import org.scanamo.ops.ScanamoOps
import org.scanamo.generic.auto._
import models.{Bake, BakeLog, BaseImage, Recipe}
import services.Loggable
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model._
import scala.annotation.tailrec
import scala.util.Try
class Dynamo(val client: DynamoDbClient, stage: String) extends Loggable {
import Dynamo._
import DynamoFormats._
object Tables {
private def table[A: DynamoFormat](
definition: CreateTableRequest
): TableWrapper[A] = {
val name = definition.tableName
val scanamoTable = ScanamoTable[A](name)
TableWrapper(definition, scanamoTable)
}
private def generateKeySchemaElement(
atttributeName: String,
keyType: KeyType
): KeySchemaElement = {
KeySchemaElement
.builder()
.attributeName(atttributeName)
.keyType(keyType)
.build()
}
def generateAttributeDefinition(
attributeName: String,
attributeType: ScalarAttributeType
): AttributeDefinition = {
AttributeDefinition
.builder()
.attributeName(attributeName)
.attributeType(attributeType)
.build()
}
def generateProvisionedThroughtput(
readCapacity: Long,
writeCapacity: Long
): ProvisionedThroughput = {
ProvisionedThroughput
.builder()
.readCapacityUnits(readCapacity)
.writeCapacityUnits(writeCapacity)
.build()
}
val baseImages = table[BaseImage](
CreateTableRequest
.builder()
.keySchema(generateKeySchemaElement("id", KeyType.HASH))
.attributeDefinitions(
generateAttributeDefinition("id", ScalarAttributeType.S)
)
.tableName(tableName("base-images"))
.provisionedThroughput(generateProvisionedThroughtput(1L, 1L))
.build()
)
val recipes = table[Recipe.DbModel](
CreateTableRequest
.builder()
.keySchema(generateKeySchemaElement("id", KeyType.HASH))
.attributeDefinitions(
generateAttributeDefinition("id", ScalarAttributeType.S)
)
.tableName(tableName("recipes"))
.provisionedThroughput(generateProvisionedThroughtput(1L, 1L))
.build()
)
val bakes = table[Bake.DbModel](
CreateTableRequest
.builder()
.keySchema(
generateKeySchemaElement("recipeId", KeyType.HASH),
generateKeySchemaElement("buildNumber", KeyType.RANGE)
)
.attributeDefinitions(
generateAttributeDefinition("recipeId", ScalarAttributeType.S),
generateAttributeDefinition("buildNumber", ScalarAttributeType.N)
)
.tableName(tableName("bakes"))
.provisionedThroughput(generateProvisionedThroughtput(1L, 1L))
.build()
)
val bakeLogs = table[BakeLog](
CreateTableRequest
.builder()
.keySchema(
generateKeySchemaElement("bakeId", KeyType.HASH),
generateKeySchemaElement("logNumber", KeyType.RANGE)
)
.attributeDefinitions(
generateAttributeDefinition("bakeId", ScalarAttributeType.S),
generateAttributeDefinition("logNumber", ScalarAttributeType.N)
)
.tableName(tableName("bake-logs"))
.provisionedThroughput(generateProvisionedThroughtput(10L, 10L))
.build()
)
}
def generateDescribeTableRequest(tableName: String): DescribeTableRequest = {
DescribeTableRequest
.builder()
.tableName(tableName)
.build()
}
def initTables(): Unit = {
import Tables._
for (table <- Seq(baseImages, recipes, bakes, bakeLogs))
createTableIfDoesNotExist(table)
}
private def tableName(suffix: String) = s"amigo-$stage-$suffix"
private def createTableIfDoesNotExist(table: TableWrapper[_]): Unit = {
if (
Try(
client.describeTable(generateDescribeTableRequest(table.name))
).isFailure
) {
log.info(s"Creating Dynamo table ${table.name} ...")
client.createTable(table.definition)
waitForTableToBecomeActive(table.name)
} else {
log.info(s"Found Dynamo table ${table.name}")
}
}
@tailrec
private def waitForTableToBecomeActive(name: String): Unit = {
Try(
Option(client.describeTable(generateDescribeTableRequest(name)))
).toOption.flatten match {
case Some(table)
if table.table.tableStatus.toString == TableStatus.ACTIVE.toString =>
()
case _ =>
log.info(s"Waiting for table $name to become active ...")
Thread.sleep(500L)
waitForTableToBecomeActive(name)
}
}
}
object Dynamo {
case class TableWrapper[A](
private[Dynamo] val definition: CreateTableRequest,
table: ScanamoTable[A]
) {
val name = definition.tableName
}
implicit class RichScanamoOps[A](val ops: ScanamoOps[A]) extends AnyVal {
def exec()(implicit dynamo: Dynamo): A =
Scanamo.apply(dynamo.client).exec(ops)
}
}