private def handleMessageFromIngestBucket()

in image-loader/app/controllers/ImageLoaderController.scala [116:184]


  private def handleMessageFromIngestBucket(sqsMessage:SQSMessage)(basicLogMarker: LogMarker): Future[Unit] = Future[Future[Unit]]{

    logger.info(basicLogMarker, sqsMessage.toString)

    extractS3KeyFromSqsMessage(sqsMessage) match {
      case Failure(exception) =>
        metrics.failedIngestsFromQueue.increment()
        logger.error(basicLogMarker, s"Failed to parse s3 data from SQS message", exception)
        Future.unit
      case Success(key) =>
        val s3IngestObject = S3IngestObject(key, store)(basicLogMarker)

        val isUiUpload = s3IngestObject.maybeMediaIdFromUiUpload.isDefined

        implicit val logMarker: LogMarker = basicLogMarker ++ Map(
          "uploadedBy" -> s3IngestObject.uploadedBy,
          "uploadedTime" -> s3IngestObject.uploadTime,
          "contentLength" -> s3IngestObject.contentLength,
          "filename" -> s3IngestObject.filename,
          "isUiUpload" -> isUiUpload,
        ) ++ s3IngestObject.maybeMediaIdFromUiUpload.map("mediaId" -> _).toMap
        val metricDimensions = List(
          new Dimension().withName("UploadedBy").withValue(s3IngestObject.uploadedBy),
          new Dimension().withName("IsUiUpload").withValue(isUiUpload.toString),
        )

        val approximateReceiveCount = getApproximateReceiveCount(sqsMessage)

        if(config.maybeUploadLimitInBytes.exists(_ < s3IngestObject.contentLength)){
          val errorMessage = s"File size exceeds the maximum allowed size (${config.maybeUploadLimitInBytes.get / 1_000_000}MB). Moving to fail bucket."
          logger.warn(logMarker, errorMessage)
          store.moveObjectToFailedBucket(s3IngestObject.key)
          s3IngestObject.maybeMediaIdFromUiUpload foreach { imageId =>
            uploadStatusTable.updateStatus( // fire & forget, since there's nothing else we can do
              imageId, UploadStatus(StatusType.Failed, Some(errorMessage))
            )
          }
          metrics.failedIngestsFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
          Future.unit
        }
        else if (approximateReceiveCount > 2) {
          metrics.abandonedMessagesFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
          val errorMessage = s"File processing has been attempted $approximateReceiveCount times. Moving to fail bucket."
          logger.warn(logMarker, errorMessage)
          store.moveObjectToFailedBucket(s3IngestObject.key)
          s3IngestObject.maybeMediaIdFromUiUpload foreach { imageId =>
            uploadStatusTable.updateStatus( // fire & forget, since there's nothing else we can do
              imageId, UploadStatus(StatusType.Failed, Some(errorMessage))
            )
          }
          Future.unit
        } else {
          attemptToProcessIngestedFile(s3IngestObject, isUiUpload)(logMarker) map { digestedFile =>
            metrics.successfulIngestsFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
            logger.info(logMarker, s"Successfully processed image ${digestedFile.file.getName}")
            store.deleteObjectFromIngestBucket(s3IngestObject.key)
          } recover {
            case _: UnsupportedMimeTypeException =>
              metrics.failedIngestsFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
              logger.info(logMarker, s"Unsupported mime type. Moving straight to fail bucket.")
              store.moveObjectToFailedBucket(s3IngestObject.key)
            case t: Throwable =>
              metrics.failedIngestsFromQueue.incrementBothWithAndWithoutDimensions(metricDimensions)
              logger.error(logMarker, s"Failed to process file. Moving to fail bucket.", t)
              store.moveObjectToFailedBucket(s3IngestObject.key)
          }
        }
    }
  }.flatten