app/services/DynamoService.scala (80 lines of code) (raw):

package services import com.typesafe.scalalogging.StrictLogging import models.DynamoErrors.{DynamoDuplicateNameError, DynamoError, DynamoPutError} import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.{BatchWriteItemRequest, ConditionalCheckFailedException, PutItemRequest, ReturnConsumedCapacity, TransactWriteItem, TransactWriteItemsRequest, WriteRequest} import zio.{ZEnv, ZIO} import zio.blocking.effectBlocking import zio.stream.ZStream import zio.duration.durationInt import scala.jdk.CollectionConverters._ // Shared functionality for DynamoDb services abstract class DynamoService(stage: String, client: DynamoDbClient) extends StrictLogging { protected val tableName: String protected def put(putRequest: PutItemRequest): ZIO[ZEnv, DynamoError, Unit] = effectBlocking { val result = client.putItem(putRequest) logger.info(s"PutItemResponse: $result") () }.mapError { case err: ConditionalCheckFailedException => DynamoDuplicateNameError(err) case other => DynamoPutError(other) } // Sends a batch of write requests, and returns any unprocessed items protected def putAll(writeRequests: List[WriteRequest]) : ZIO[ZEnv, DynamoPutError, List[WriteRequest]] = effectBlocking { val batchWriteRequest = BatchWriteItemRequest.builder .requestItems(Map(tableName -> writeRequests.asJava).asJava) .returnConsumedCapacity(ReturnConsumedCapacity.TOTAL) .build() val result = client.batchWriteItem(batchWriteRequest) logger.info(s"BatchWriteItemResponse: $result") result .unprocessedItems() .asScala .get(tableName) .map(items => items.asScala.toList) .getOrElse(Nil) }.mapError(DynamoPutError) /** * Dynamodb limits us to batches of 25 items, and may return unprocessed items in the response. * This function groups items into batches of 25, and also checks the `unprocessedItems` in case we need to send * any again. * It uses an infinite zio stream to do this, pausing between batches to avoid any throttling. It stops processing * the stream when the list of batches is empty. */ protected val BATCH_SIZE = 25 protected def putAllBatched( writeRequests: List[WriteRequest]): ZIO[ZEnv, DynamoPutError, Unit] = { val batches = writeRequests.grouped(BATCH_SIZE).toList ZStream(()).forever .fixed(2.seconds) // wait 2 seconds between batches .timeoutError(DynamoPutError( new Throwable("Timed out writing batches to dynamodb")))(1.minute) .foldWhileM(batches)(_.nonEmpty) { case (nextBatch :: remainingBatches, _) => putAll(nextBatch).map { case Nil => remainingBatches case unprocessed => unprocessed :: remainingBatches } case (Nil, _) => ZIO.succeed(Nil) } .unit // on success, the result value isn't meaningful } /** * Dynamodb limits us to batches of 25 items. * This function groups items into batches of 25. Each batch is sent as a transaction, and if any transaction * fails then an error is returned to the client (no retries). * It uses a zio stream to do this, pausing between batches to avoid any throttling and timing out after 1 minute. */ protected def putAllBatchedTransaction( items: List[TransactWriteItem]): ZIO[ZEnv, DynamoPutError, Unit] = { val batches = items.grouped(BATCH_SIZE).toList ZStream .fromIterable(batches) .fixed(2.seconds) // wait 2 seconds between batches .timeoutError(DynamoPutError( new Throwable("Timed out writing batches to dynamodb")))(1.minute) .mapM(putAllTransaction) .runCollect .unit } private def putAllTransaction( items: List[TransactWriteItem]): ZIO[ZEnv, DynamoPutError, Unit] = effectBlocking { val request = TransactWriteItemsRequest.builder .transactItems(items.asJava) .build() val result = client.transactWriteItems(request) logger.info(s"TransactWriteItemsResponse: $result") () }.mapError(DynamoPutError) }