in image-loader/app/controllers/ImageLoaderController.scala [116:184]
private def handleMessageFromIngestBucket(sqsMessage:SQSMessage)(basicLogMarker: LogMarker): Future[Unit] = Future[Future[Unit]]{
logger.info(basicLogMarker, sqsMessage.toString)
extractS3KeyFromSqsMessage(sqsMessage) match {
case Failure(exception) =>
metrics.failedIngestsFromQueue.increment()
logger.error(basicLogMarker, s"Failed to parse s3 data from SQS message", exception)
Future.unit
case Success(key) =>
val s3IngestObject = S3IngestObject(key, store)(basicLogMarker)
val isUiUpload = s3IngestObject.maybeMediaIdFromUiUpload.isDefined
implicit val logMarker: LogMarker = basicLogMarker ++ Map(
"uploadedBy" -> s3IngestObject.uploadedBy,
"uploadedTime" -> s3IngestObject.uploadTime,
"contentLength" -> s3IngestObject.contentLength,
"filename" -> s3IngestObject.filename,
"isUiUpload" -> isUiUpload,
) ++ s3IngestObject.maybeMediaIdFromUiUpload.map("mediaId" -> _).toMap
val metricDimensions = List(
new Dimension().withName("UploadedBy").withValue(s3IngestObject.uploadedBy),
new Dimension().withName("IsUiUpload").withValue(isUiUpload.toString),
)
val approximateReceiveCount = getApproximateReceiveCount(sqsMessage)
if(config.maybeUploadLimitInBytes.exists(_ < s3IngestObject.contentLength)){
val errorMessage = s"File size exceeds the maximum allowed size (${config.maybeUploadLimitInBytes.get / 1_000_000}MB). Moving to fail bucket."
logger.warn(logMarker, errorMessage)
store.moveObjectToFailedBucket(s3IngestObject.key)
s3IngestObject.maybeMediaIdFromUiUpload foreach { imageId =>
uploadStatusTable.updateStatus( // fire & forget, since there's nothing else we can do
imageId, UploadStatus(StatusType.Failed, Some(errorMessage))
)
}
metrics.failedIngestsFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
Future.unit
}
else if (approximateReceiveCount > 2) {
metrics.abandonedMessagesFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
val errorMessage = s"File processing has been attempted $approximateReceiveCount times. Moving to fail bucket."
logger.warn(logMarker, errorMessage)
store.moveObjectToFailedBucket(s3IngestObject.key)
s3IngestObject.maybeMediaIdFromUiUpload foreach { imageId =>
uploadStatusTable.updateStatus( // fire & forget, since there's nothing else we can do
imageId, UploadStatus(StatusType.Failed, Some(errorMessage))
)
}
Future.unit
} else {
attemptToProcessIngestedFile(s3IngestObject, isUiUpload)(logMarker) map { digestedFile =>
metrics.successfulIngestsFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
logger.info(logMarker, s"Successfully processed image ${digestedFile.file.getName}")
store.deleteObjectFromIngestBucket(s3IngestObject.key)
} recover {
case _: UnsupportedMimeTypeException =>
metrics.failedIngestsFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
logger.info(logMarker, s"Unsupported mime type. Moving straight to fail bucket.")
store.moveObjectToFailedBucket(s3IngestObject.key)
case t: Throwable =>
metrics.failedIngestsFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
logger.error(logMarker, s"Failed to process file. Moving to fail bucket.", t)
store.moveObjectToFailedBucket(s3IngestObject.key)
}
}
}
}.flatten