in online_archive/src/main/scala/AssetSweeperMessageProcessor.scala [196:230]
def handleNewFile(routingKey: String, msg: Json): Future[Either[String, MessageProcessorReturnValue]] = {
import AssetSweeperNewFile.Codec._ //need to use custom decoder to properly decode message
if(!routingKey.endsWith("new") && !routingKey.endsWith("update")) return Future.failed(SilentDropMessage())
msg.as[AssetSweeperNewFile] match {
case Left(err)=>
Future(Left(s"Could not parse incoming message: $err"))
case Right(newFile)=>
if(newFile.ignore) {
logger.info(s"File ${newFile.filepath}/${newFile.filename} is marked as ignored")
Future.failed(SilentDropMessage(Some("Ignored file")))
} else if(isPreview.unanchored.matches(newFile.filepath)) {
logger.info(s"Filepath ${newFile.filepath} indicates preview files, not archiving")
Future.failed(SilentDropMessage(Some("Preview file")))
} else {
(for {
fullPath <- compositingGetPath(newFile)
projectRecord <- asLookup.assetFolderProjectLookup(fullPath)
result <- processFileAndProject(fullPath, projectRecord)
} yield result).recoverWith({
case err: Throwable =>
val failure = FailureRecord(
None,
Paths.get(newFile.filepath, newFile.filename).toString,
1,
s"Uncaught exception: ${err.getMessage}",
ErrorComponents.Internal,
RetryStates.RanOutOfRetries
)
failureRecordDAO
.writeRecord(failure)
.flatMap(_ => Future.failed(err))
})
}
}
}