in app/services/IngestProxyQueue.scala [70:210]
override def convertMessageBody(body: String): Either[io.circe.Error, IngestMessage] =
io.circe.parser.parse(body).flatMap(_.as[IngestMessage])
override def receive: Receive = {
case StartAnalyse(entry) =>
val originalSender = sender()
proxyGenerators.requestMetadataAnalyse(entry,defaultRegion).onComplete({
case Success(Left(problem))=>
logger.error(s"Could not request analyse for $entry: $problem")
originalSender ! Status.Failure(new RuntimeException(problem))
case Success(Right(status))=>
logger.info(s"Started metadata analyse: $status")
originalSender ! Status.Success
case Failure(err)=>
logger.error("Metadata request thread failed: ", err)
originalSender ! Status.Failure(err)
})
case CheckRegisteredThumb(entry) =>
val originalSender = sender()
proxyLocationDAO.getProxy(entry.id, ProxyType.THUMBNAIL).map({
case Some(proxyLocation) =>
logger.info(s"${entry.bucket}:${entry.path} already has a registered thumbnail at $proxyLocation")
originalSender ! Status.Success
case None =>
logger.info(s"${entry.bucket}:${entry.path} has no registered thumbnail")
ownRef ! CheckNonRegisteredThumb(entry)
}).onComplete({
case Success(_) => ()
case Failure(err) =>
logger.error("Could not look up proxy data: ", err)
originalSender ! Status.Failure
})
case CheckNonRegisteredThumb(entry) =>
val originalSender = sender()
implicit val s3Client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), entry.region.map(Region.of))
ProxyLocator.findProxyLocation(entry).map(results => {
val foundProxies = results.collect({ case Right(loc) => loc }).filter(loc => loc.proxyType == ProxyType.THUMBNAIL)
if (foundProxies.isEmpty) {
logger.info(s"${entry.bucket}:${entry.path} has no locatable thumbnails in expected locations. Generating a new one...")
ownRef ! CreateNewThumbnail(entry)
} else {
logger.info(s"${entry.bucket}:${entry.path}: Found existing potential thumbnails: $foundProxies")
//add given items to the proxies table for the item and then update the index record to say it's proxied
Future
.sequence(foundProxies.map(proxyLocationDAO.saveProxy))
.map(results => {
ProxyLocator.setProxiedWithRetry(entry.id)
results
})
}
}).onComplete({
case Success(_) => ()
case Failure(err) =>
logger.error(s"${entry.bucket}:${entry.path}: Could not run proxy location find: ", err)
originalSender ! Status.Failure
})
case CreateNewThumbnail(entry) =>
val originalSender = sender()
proxyGenerators.requestProxyJob(RequestType.THUMBNAIL,entry,None).onComplete({
case Success(Success(result)) => //thread completed and we got a result
logger.info(s"${entry.bucket}:${entry.path}: started thumbnailing with ECS id $result")
originalSender ! Status.Success
case Success(Failure(err)) => //thread completed OK but we did not start a job
logger.error(s"${entry.bucket}:${entry.path}: Could not start thumbnailing:", err)
originalSender ! Status.Failure
case Failure(err) =>
logger.error(s"${entry.bucket}:${entry.path}: thumbnailing thread failed", err)
originalSender ! Status.Failure
})
case CheckNonRegisteredProxy(entry) =>
val originalSender = sender()
implicit val s3Client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), entry.region.map(Region.of))
ProxyLocator.findProxyLocation(entry).flatMap(results => {
val foundProxies = results.collect({ case Right(loc) => loc }).filter(loc => loc.proxyType != ProxyType.THUMBNAIL)
if (foundProxies.isEmpty) {
logger.info(s"${entry.bucket}:${entry.path} has no locatable proxies in expected locations. Generating a new one...")
proxyGenerators.defaultProxyType(entry) match {
case Some(proxyType) =>
proxyGenerators.requestProxyJob(RequestType.PROXY, entry, Some(proxyType)).map(result=>Seq(result))
case None=>
logger.error(s"No default proxy type available for ${entry.bucket}:${entry.path} (${entry.mimeType.toString})")
throw new RuntimeException("No default proxy type available")
}
} else {
logger.info(s"${entry.bucket}:${entry.path} has unregistered proxies: $foundProxies")
Future
.sequence(foundProxies.map(proxyLocationDAO.saveProxy))
.map(results => {
ProxyLocator.setProxiedWithRetry(entry.id)
results
})
}
}).onComplete({
case Success(_) => ()
case Failure(err) =>
logger.error(s"${entry.bucket}:${entry.path}: findProxyLocation failed: ", err)
originalSender ! Status.Failure
})
case CheckRegisteredProxy(entry) =>
val possibleProxyTypes = Seq(ProxyType.AUDIO, ProxyType.VIDEO)
val originalSender = sender()
Future.sequence(possibleProxyTypes.map(pt => proxyLocationDAO.getProxy(entry.id, pt))).map(results => {
val validProxies = results.collect({ case Some(proxyLocation) => proxyLocation })
if (validProxies.isEmpty) {
logger.info(s"${entry.bucket}:${entry.path} has no known proxies, checking for loose...")
ownRef ! CheckNonRegisteredProxy(entry)
} else {
logger.info(s"${entry.bucket}:${entry.path} has these known proxies: $validProxies")
originalSender ! Status.Success
}
}).onComplete({
case Success(_) =>
()
case Failure(err) =>
logger.error("Could not check for existing proxies", err)
originalSender ! Status.Failure
})
case HandleDomainMessage(finalMsg:IngestMessage, queueUrl, receiptHandle)=>
logger.info(s"Received notification of new item: ${finalMsg.archiveEntry}")
indexer.getById(finalMsg.archiveEntry.id).onComplete({
case Success(entry)=>
logger.info(s"DEBUGGING - indexed entry for ${finalMsg.archiveEntry.id} was $entry at point of receive")
case Failure(err)=>
logger.error(s"Could not check indexed entry for ${finalMsg.archiveEntry.id}: ${err.getMessage}", err)
})
ownRef ! CheckRegisteredThumb(finalMsg.archiveEntry)
ownRef ! CheckRegisteredProxy(finalMsg.archiveEntry)
ownRef ! StartAnalyse(finalMsg.archiveEntry)
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle))
ownRef ! ReadyForNextMessage //tell the superclass we are ready for the next message
case other:GenericSqsActor.SQSMsg => handleGeneric(other)
}