app/services/DynamoChannelTests.scala (306 lines of code) (raw):

package services import com.typesafe.scalalogging.StrictLogging import io.circe.generic.auto._ import io.circe.syntax._ import io.circe.{Decoder, Encoder} import models.DynamoErrors._ import models._ import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model._ import utils.Circe.{dynamoMapToJson, jsonToDynamo} import zio.blocking.effectBlocking import zio.duration.durationInt import zio.stream.ZStream import zio.{ZEnv, ZIO} import java.time.OffsetDateTime import scala.jdk.CollectionConverters._ class DynamoChannelTests(stage: String, client: DynamoDbClient) extends DynamoService(stage, client) with StrictLogging { protected val tableName = s"support-admin-console-channel-tests-$stage" private val campaignNameIndex = "campaignName-name-index" private def buildKey(channel: Channel, testName: String): java.util.Map[String, AttributeValue] = Map( "channel" -> AttributeValue.builder.s(channel.toString).build, "name" -> AttributeValue.builder.s(testName).build ).asJava /** * Attempts to retrieve a test from dynamodb. Fails if the test does not exist. */ private def get(testName: String, channel: Channel): ZIO[ZEnv, DynamoGetError, java.util.Map[String, AttributeValue]] = effectBlocking { val query = QueryRequest .builder .tableName(tableName) .keyConditionExpression("channel = :channel AND #name = :name") .expressionAttributeValues( Map( ":channel" -> AttributeValue.builder.s(channel.toString).build, ":name" -> AttributeValue.builder.s(testName).build ).asJava ) .expressionAttributeNames(Map("#name" -> "name").asJava) // name is a reserved word in dynamodb .build() client .query(query) .items.asScala.headOption }.flatMap { case Some(item) => ZIO.succeed(item) case None => ZIO.fail(DynamoGetError(new Exception(s"Test does not exist: $channel/$testName"))) }.mapError(error => DynamoGetError(error) ) private def getAll(channel: Channel): ZIO[ZEnv, DynamoGetError, java.util.List[java.util.Map[String, AttributeValue]]] = effectBlocking { client.query( QueryRequest .builder .tableName(tableName) .keyConditionExpression("channel = :channel") .expressionAttributeValues(Map( ":channel" -> AttributeValue.builder.s(channel.toString).build, ":archived" -> AttributeValue.builder.s("Archived").build ).asJava) .expressionAttributeNames(Map( "#status" -> "status" ).asJava) .filterExpression("#status <> :archived") .build() ).items }.mapError(DynamoGetError) private def getAllInCampaign(campaignName: String): ZIO[ZEnv, DynamoGetError, java.util.List[java.util.Map[String, AttributeValue]]] = effectBlocking { client.query( QueryRequest .builder .tableName(tableName) .keyConditionExpression("campaignName = :campaignName") .indexName(campaignNameIndex) .expressionAttributeValues(Map( ":campaignName" -> AttributeValue.builder.s(campaignName).build ).asJava) .build() ).items }.mapError(DynamoGetError) private def update(updateRequest: UpdateItemRequest): ZIO[ZEnv, DynamoError, Unit] = effectBlocking { val result = client.updateItem(updateRequest) logger.info(s"UpdateItemResponse: $result") () }.mapError { case err: ConditionalCheckFailedException => DynamoNoLockError(err) case other => DynamoPutError(other) } def getTest[T <: ChannelTest[T] : Decoder](testName: String, channel: Channel): ZIO[ZEnv, DynamoGetError, T] = get(testName, channel) .map(item => dynamoMapToJson(item).as[T]) .flatMap { case Right(test) => ZIO.succeed(test) case Left(error) => ZIO.fail(DynamoGetError(error)) } def getAllTests[T <: ChannelTest[T] : Decoder](channel: Channel): ZIO[ZEnv, DynamoGetError, List[T]] = getAll(channel).map(results => results.asScala .map(item => dynamoMapToJson(item).as[T]) .flatMap { case Right(test) => Some(test) case Left(error) => logger.error(s"Failed to decode item from Dynamo: ${error.getMessage}") None } .toList .sortBy(_.priority) ) // Does not decode the Dynamodb data def getRawTests(channel: Channel, testNames: List[String]): ZIO[ZEnv, DynamoGetError, List[java.util.Map[String, AttributeValue]]] = { // Build a batch item request val items = testNames.map(testName => buildKey(channel, testName)) val keysAndAttributes = KeysAndAttributes.builder().keys(items.asJava).build() val request = BatchGetItemRequest.builder() .requestItems(Map(tableName -> keysAndAttributes).asJava) .build() effectBlocking { client.batchGetItem(request) .responses().asScala.get(tableName).map(_.asScala.toList).getOrElse(Nil) }.mapError(DynamoGetError) } def getTests[T <: ChannelTest[T] : Decoder](channel: Channel, testNames: List[String]): ZIO[ZEnv, DynamoGetError, List[T]] = { getRawTests(channel, testNames).map(rawTests => { rawTests.flatMap(rawTest => dynamoMapToJson(rawTest).as[T] match { case Right(test) => Some(test) case Left(error) => logger.error(s"Failed to decode item from Dynamo: ${error.getMessage}") None } ) }) } // Returns all tests in a campaign, sorted by channel import models.ChannelTest.channelTestDecoder def getAllTestsInCampaign(campaignName: String): ZIO[ZEnv, DynamoGetError, List[ChannelTest[_]]] = getAllInCampaign(campaignName) .map(results => results.asScala .map(item => dynamoMapToJson(item).as[ChannelTest[_]]) .flatMap { case Right(test) => Some(test) case Left(error) => logger.error(s"Failed to decode item from Dynamo: ${error.getMessage}") None } .toList .sortBy(_.channel.toString) ) def createOrUpdateTests[T <: ChannelTest[T] : Encoder](tests: List[T], channel: Channel): ZIO[ZEnv, DynamoPutError, Unit] = { val writeRequests = tests.zipWithIndex.map { case (test, priority) => // Add the priority and channel fields, which we don't have in S3 val prioritised = test.withPriority(priority).withChannel(channel) val item = jsonToDynamo(prioritised.asJson).m() WriteRequest.builder.putRequest( PutRequest .builder .item(item) .build() ).build() } logger.info(s"About to batch put: $writeRequests") putAllBatched(writeRequests) } /** * With an UpdateItem request we have to provide an expression to specify each attribute to be updated. * We do this by iterating over the attributes in `item` and building an expression */ private def buildUpdateTestExpression(item: Map[String, AttributeValue]): String = { val subExprs = item.foldLeft[List[String]](Nil) { case (acc, (key, value)) => s"$key = :$key" +: acc } s"set ${subExprs.mkString(", ")} remove lockStatus" // Unlock the test at the same time } def updateTest[T <: ChannelTest[T] : Encoder](test: T, channel: Channel, email: String): ZIO[ZEnv, DynamoError, Unit] = { val item = jsonToDynamo(test.asJson).m().asScala.toMap - "status" - // Do not update status - this is a separate action "priority" - // Do not update priority - this is a separate action "lockStatus" - // Unlock by removing lockStatus "name" - // key field "channel" // key field val updateExpression = buildUpdateTestExpression(item) val attributeValues = item.map { case (key, value) => s":$key" -> value } // Add email, for the conditional update val attributeValuesWithEmail = attributeValues + (":email" -> AttributeValue.builder.s(email).build) val updateRequest = UpdateItemRequest .builder .tableName(tableName) .key(buildKey(channel, test.name)) .updateExpression(updateExpression) .expressionAttributeValues(attributeValuesWithEmail.asJava) .conditionExpression("lockStatus.email = :email") .build() update(updateRequest) } // Returns the value of the bottom priority test - which is the highest value, because 0 is top priority private def getBottomPriority[T <: ChannelTest[T] : Decoder](channel: Channel): ZIO[ZEnv, DynamoError, Int] = getAllTests[T](channel) .map(allTests => allTests.flatMap(_.priority).maxOption.getOrElse(0)) // Creates a new test, with bottom priority def createTest[T <: ChannelTest[T] : Encoder : Decoder](test: T, channel: Channel): ZIO[ZEnv, DynamoError, T] = getBottomPriority[T](channel) .flatMap(bottomPriority => { val priority = bottomPriority + 1 val enrichedTest = test.withChannel(channel).withPriority(priority) val item = jsonToDynamo(enrichedTest.asJson).m() val request = PutItemRequest .builder .tableName(tableName) .item(item) // Do not overwrite if already in dynamo .conditionExpression("attribute_not_exists(#name)") .expressionAttributeNames(Map("#name" -> "name").asJava) .build() put(request).map(_ => enrichedTest) }) def lockTest(testName: String, channel: Channel, email: String, force: Boolean): ZIO[ZEnv, DynamoError, Unit] = { val lockStatus = LockStatus( locked = true, email = Some(email), timestamp = Some(OffsetDateTime.now()) ) val request = { val builder = UpdateItemRequest .builder .tableName(tableName) .key(buildKey(channel, testName)) .updateExpression("set lockStatus = :lockStatus") .expressionAttributeValues(Map( ":lockStatus" -> jsonToDynamo(lockStatus.asJson), ).asJava) .expressionAttributeNames(Map("#name" -> "name").asJava) val itemExistsExpression = "attribute_exists(#name)" // only update if it already exists in the table if (!force) { // Check it isn't already locked builder.conditionExpression(s"$itemExistsExpression and attribute_not_exists(lockStatus.email)") } else { builder.conditionExpression(itemExistsExpression) } builder.build() } update(request) } // Removes the lockStatus attribute if the user currently has it locked def unlockTest(testName: String, channel: Channel, email: String): ZIO[ZEnv, DynamoError, Unit] = { val request = UpdateItemRequest .builder .tableName(tableName) .key(buildKey(channel, testName)) .updateExpression("remove lockStatus") .conditionExpression("lockStatus.email = :email") .expressionAttributeValues(Map( ":email" -> AttributeValue.builder.s(email).build ).asJava) .build() update(request) } def deleteTests(testNames: List[String], channel: Channel): ZIO[ZEnv, DynamoPutError, Unit] = { val deleteRequests = testNames.map { testName => WriteRequest .builder .deleteRequest( DeleteRequest .builder .key(buildKey(channel, testName)) .build() ) .build() } logger.info(s"About to batch delete: $deleteRequests") putAllBatched(deleteRequests) } // Set `priority` attribute based on the ordering of the List def setPriorities(testNames: List[String], channel: Channel): ZIO[ZEnv, DynamoError, Unit] = { val items = testNames.zipWithIndex.map { case (testName, priority) => TransactWriteItem .builder .update( Update .builder .tableName(tableName) .key(buildKey(channel, testName)) .expressionAttributeValues(Map( ":priority" -> AttributeValue.builder.n(priority.toString).build, ":name" -> AttributeValue.builder.s(testName).build ).asJava) .expressionAttributeNames(Map("#name" -> "name").asJava) .updateExpression("SET priority = :priority") .conditionExpression("#name = :name") // only update if it already exists in the table .build() ) .build() } putAllBatchedTransaction(items) } def updateStatuses(testNames: List[String], channel: Channel, status: Status): ZIO[ZEnv, DynamoError, Unit] = { val items = testNames.map { testName => TransactWriteItem .builder .update( Update .builder .tableName(tableName) .key(buildKey(channel, testName)) .expressionAttributeValues(Map( ":status" -> AttributeValue.builder.s(status.toString).build, ":name" -> AttributeValue.builder.s(testName).build ).asJava) .expressionAttributeNames(Map( "#status" -> "status", "#name" -> "name" ).asJava) .updateExpression("SET #status = :status") .conditionExpression("#name = :name") // only update if it already exists in the table .build() ) .build() } putAllBatchedTransaction(items) } }