app/services/DynamoChannelTestsAudit.scala (101 lines of code) (raw):

package services import com.typesafe.scalalogging.StrictLogging import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.syntax.EncoderOps import io.circe.{Decoder, Encoder} import models.ChannelTest import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, _} import zio.{ZEnv, ZIO} import models.DynamoErrors._ import DynamoChannelTestsAudit.{ChannelTestAudit, getTimeToLive} import utils.Circe.{dynamoMapToJson, jsonToDynamo} import zio.blocking.effectBlocking import java.time.OffsetDateTime import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava} object DynamoChannelTestsAudit { // The model that we write to the audit table case class ChannelTestAudit[T : Encoder : Decoder]( channelAndName: String, // The partition key is the channel and test name combined timestamp: OffsetDateTime, // The range key is the timestamp of the change ttlInSecondsSinceEpoch: Long, // Expiry time in seconds since Epoch userEmail: String, // The email address of the user making the change item: T // The new state of the item being changed ) implicit def encoder[T : Encoder : Decoder] = deriveEncoder[ChannelTestAudit[T]] implicit def decoder[T : Encoder : Decoder] = deriveDecoder[ChannelTestAudit[T]] private val RetentionPeriodInYears = 1 def getTimeToLive(timestamp: OffsetDateTime): OffsetDateTime = timestamp.plusYears(RetentionPeriodInYears) } class DynamoChannelTestsAudit(stage: String, client: DynamoDbClient) extends DynamoService(stage, client) with StrictLogging { protected val tableName = s"support-admin-console-channel-tests-audit-$stage" private def getAuditsFromDynamo(channelAndName: String): ZIO[ZEnv, DynamoGetError, java.util.List[java.util.Map[String, AttributeValue]]] = effectBlocking { client.query( QueryRequest .builder .tableName(tableName) .keyConditionExpression("channelAndName = :channelAndName") .expressionAttributeValues(Map( ":channelAndName" -> AttributeValue.builder.s(channelAndName).build ).asJava) .build() ).items }.mapError(DynamoGetError) def createAudit[T <: ChannelTest[T] : Encoder : Decoder](test: T, userEmail: String): ZIO[ZEnv, DynamoError, Unit] = { val channelAndName = s"${test.channel.get}_${test.name}" val timestamp = OffsetDateTime.now() val ttlInSecondsSinceEpoch = getTimeToLive(timestamp).toInstant.getEpochSecond val audit = ChannelTestAudit( channelAndName, timestamp, ttlInSecondsSinceEpoch, userEmail, item = test ) val request = PutItemRequest .builder .tableName(tableName) .item( jsonToDynamo(audit.asJson).m() ) .build() put(request) } // Batch write many audits def createAudits[T <: ChannelTest[T] : Encoder : Decoder](tests: List[T], userEmail: String): ZIO[ZEnv, DynamoPutError, Unit] = { val timestamp = OffsetDateTime.now() val ttlInSecondsSinceEpoch = getTimeToLive(timestamp).toInstant.getEpochSecond val writeRequests = tests.map { test => val channelAndName = s"${test.channel.get}_${test.name}" val audit = ChannelTestAudit( channelAndName, timestamp, ttlInSecondsSinceEpoch, userEmail, item = test ) val item = jsonToDynamo(audit.asJson).m() WriteRequest.builder.putRequest( PutRequest .builder .item(item) .build() ).build() } putAllBatched(writeRequests) } def getAuditsForChannelTest(channel: String, name: String): ZIO[ZEnv, DynamoError, List[ChannelTestAudit[ChannelTest[_]]]] = { val channelAndName = s"${channel}_$name" getAuditsFromDynamo(channelAndName).map { results => results.asScala .map(item => dynamoMapToJson(item).as[ChannelTestAudit[ChannelTest[_]]]) .flatMap { case Right(audit) => Some(audit) case Left(error) => logger.error(s"Failed to decode audit item from Dynamo: ${error.getMessage}") None } .toList .sortBy(_.timestamp) } } }