backend/app/services/IngestStorage.scala (102 lines of code) (raw):

package services import java.io.InputStream import java.util.UUID import cats.syntax.either._ import com.amazonaws.services.s3.model.S3ObjectSummary import model.{Languages, Uri} import model.ingestion._ import play.api.libs.json.{JsError, JsSuccess, Json} import utils.Logging import utils.attempt.{Failure, IllegalStateFailure, JsonParseFailure, UnknownFailure} import utils.aws.S3Client import scala.jdk.CollectionConverters._ import scala.util.Try import scala.util.control.NonFatal trait IngestStorage { def list: Either[Failure, Iterable[Key]] def getData(key: Key): Either[Failure, InputStream] def getMetadata(key: Key): Either[Failure, FileContext] def delete(key: Key): Either[Failure, Unit] def sendToDeadLetterBucket(key: Key): Either[Failure, Unit] def retryDeadLetters(): Either[Failure, Unit] } class S3IngestStorage private(client: S3Client, ingestBucket: String, deadLetterBucket: String) extends IngestStorage with Logging { private def parseKey(item: S3ObjectSummary): (Long, UUID) = { val components = item.getKey.stripPrefix(dataPrefix).stripSuffix(".data").split('_') val timestamp = components(0).toLong val uuid = UUID.fromString(components(1)) timestamp -> uuid } override def list = { Either.catchNonFatal { val result = client.aws.listObjects(ingestBucket, dataPrefix) val objs = result.getObjectSummaries.asScala objs.map(parseKey) }.leftMap(UnknownFailure.apply) } override def getData(key: Key): Either[Failure, InputStream] = { Either.catchNonFatal(client.aws.getObject(ingestBucket, dataKey(key)).getObjectContent).leftMap(UnknownFailure.apply) } override def getMetadata(key: Key): Either[Failure, FileContext] = { Either.catchNonFatal(client.aws.getObject(ingestBucket, metadataKey(key))).leftMap(UnknownFailure.apply).flatMap { s3Object => // turn stream back into the original object val stream = s3Object.getObjectContent try { val json = Json.parse(stream) Json.fromJson[IngestMetadata](json) match { case JsSuccess(metadata, _) => FileContext.fromIngestMetadata(metadata) case JsError(error) => Left(JsonParseFailure(error)) } } finally { stream.close() } } } override def delete(key: (Long, UUID)) = { Either.catchNonFatal{ client.aws.deleteObject(ingestBucket, dataKey(key)) client.aws.deleteObject(ingestBucket, metadataKey(key)) }.leftMap(UnknownFailure.apply) } override def sendToDeadLetterBucket(key: Key): Either[Failure, Unit] = { Either.catchNonFatal { client.aws.copyObject(ingestBucket, dataKey(key), deadLetterBucket, dataKey(key)) client.aws.copyObject(ingestBucket, metadataKey(key), deadLetterBucket, metadataKey(key)) } match { // if copy succeeded, delete the files from the ingest bucket case Right(_) => delete(key) case Left(failure) => Left(UnknownFailure(failure)) } } override def retryDeadLetters(): Either[UnknownFailure, Unit] = { Either.catchNonFatal { val result = client.aws.listObjects(deadLetterBucket) if (!result.isTruncated) { logger.info(s"Sending ${result.getObjectSummaries.size()} files from dead letter bucket to ingest bucket.") val (meta, data) = result.getObjectSummaries.asScala.toList.map(_.getKey).partition(k => k.startsWith("meta")) // ingestion starts once a 'data' file is available, so copy meta files first val keysWithMetaFirst = meta.concat(data) keysWithMetaFirst.foreach{ key => Try(client.aws.copyObject(deadLetterBucket, key, ingestBucket, key)) // on success, clean up the file from the dead letter bucket .map(_ => client.aws.deleteObject(deadLetterBucket, key)) } } else { val msg = "Too many dead letter files to resync via API. Try using aws cli e.g aws s3 sync s3://deadletterbucket s3://ingestbucket" logger.error(msg) throw new Error(msg) } }.leftMap(UnknownFailure.apply) } } object S3IngestStorage { def apply(client: S3Client, ingestBucketName: String, deadLetterBucketName: String): Either[Failure, S3IngestStorage] = { try { if (!client.doesBucketExist(ingestBucketName)) { Left(IllegalStateFailure(s"Bucket $ingestBucketName does not exist")) } else if (!client.doesBucketExist(deadLetterBucketName)) { Left(IllegalStateFailure(s"Bucket $deadLetterBucketName does not exist")) }else { Right(new S3IngestStorage(client, ingestBucketName, deadLetterBucketName)) } } catch { case ex: Exception => Left(UnknownFailure(ex)) } } }