in app/services/DynamoService.scala [60:76]
protected def putAllBatched(
writeRequests: List[WriteRequest]): ZIO[ZEnv, DynamoPutError, Unit] = {
val batches = writeRequests.grouped(BATCH_SIZE).toList
ZStream(()).forever
.fixed(2.seconds) // wait 2 seconds between batches
.timeoutError(DynamoPutError(
new Throwable("Timed out writing batches to dynamodb")))(1.minute)
.foldWhileM(batches)(_.nonEmpty) {
case (nextBatch :: remainingBatches, _) =>
putAll(nextBatch).map {
case Nil => remainingBatches
case unprocessed => unprocessed :: remainingBatches
}
case (Nil, _) => ZIO.succeed(Nil)
}
.unit // on success, the result value isn't meaningful
}