def handleNewFile()

in online_archive/src/main/scala/AssetSweeperMessageProcessor.scala [196:230]


  def handleNewFile(routingKey: String, msg: Json): Future[Either[String, MessageProcessorReturnValue]] = {
    import AssetSweeperNewFile.Codec._  //need to use custom decoder to properly decode message
    if(!routingKey.endsWith("new") && !routingKey.endsWith("update")) return Future.failed(SilentDropMessage())
    msg.as[AssetSweeperNewFile] match {
      case Left(err)=>
        Future(Left(s"Could not parse incoming message: $err"))
      case Right(newFile)=>
        if(newFile.ignore) {
          logger.info(s"File ${newFile.filepath}/${newFile.filename} is marked as ignored")
          Future.failed(SilentDropMessage(Some("Ignored file")))
        } else if(isPreview.unanchored.matches(newFile.filepath)) {
          logger.info(s"Filepath ${newFile.filepath} indicates preview files, not archiving")
          Future.failed(SilentDropMessage(Some("Preview file")))
        } else {
          (for {
            fullPath <- compositingGetPath(newFile)
            projectRecord <- asLookup.assetFolderProjectLookup(fullPath)
            result <- processFileAndProject(fullPath, projectRecord)
          } yield result).recoverWith({
            case err: Throwable =>
              val failure = FailureRecord(
                None,
                Paths.get(newFile.filepath, newFile.filename).toString,
                1,
                s"Uncaught exception: ${err.getMessage}",
                ErrorComponents.Internal,
                RetryStates.RanOutOfRetries
              )
              failureRecordDAO
                .writeRecord(failure)
                .flatMap(_ => Future.failed(err))
          })
        }
    }
  }