app/services/IngestProxyQueue.scala (180 lines of code) (raw):
package services
import akka.actor.{Actor, ActorRef, ActorSystem, Status}
import akka.stream.{ActorMaterializer, Materializer}
import io.circe.syntax._
import io.circe.generic.auto._
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest}
import com.theguardian.multimedia.archivehunter.common.ProxyTranscodeFramework.{ProxyGenerators, RequestType}
import com.theguardian.multimedia.archivehunter.common.{cmn_models, _}
import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager, SQSClientManager}
import com.theguardian.multimedia.archivehunter.common.cmn_models.{IngestMessage, ScanTargetDAO}
import helpers.ProxyLocator
import javax.inject.{Inject, Named, Singleton}
import models.AwsSqsMsg
import play.api.{Configuration, Logger}
import software.amazon.awssdk.regions.Region
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object IngestProxyQueue extends GenericSqsActorMessages {
trait IPQMsg extends SQSMsg
case class CheckRegisteredProxy(entry:ArchiveEntry) extends IPQMsg
case class CheckNonRegisteredProxy(entry:ArchiveEntry) extends IPQMsg
case class CheckRegisteredThumb(entry:ArchiveEntry) extends IPQMsg
case class CheckNonRegisteredThumb(entry: ArchiveEntry) extends IPQMsg
case class CreateNewThumbnail(entry:ArchiveEntry) extends IPQMsg
case class StartAnalyse(entry:ArchiveEntry) extends IPQMsg
}
@Singleton
class IngestProxyQueue @Inject()(config: Configuration,
system: ActorSystem,
sqsClientManager: SQSClientManager,
proxyGenerators: ProxyGenerators,
s3ClientMgr: S3ClientManager,
dynamoClientMgr: DynamoClientManager,
esClientMgr:ESClientManager,
)(implicit scanTargetDAO: ScanTargetDAO, proxyLocationDAO: ProxyLocationDAO, override val mat:Materializer)
extends GenericSqsActor[IngestMessage] with ZonedDateTimeEncoder with StorageClassEncoder {
import IngestProxyQueue._
import GenericSqsActor._
private val logger = Logger(getClass)
lazy override protected val sqsClient = sqsClientManager.getClient(config.getOptional[String]("externalData.awsProfile"))
override protected implicit val implSystem = system
private implicit val esClient = esClientMgr.getClient()
lazy protected implicit val indexer = new Indexer(config.get[String]("externalData.indexName"))
//override this in testing
protected val ownRef: ActorRef = self
override protected implicit val ec: ExecutionContext = system.dispatcher
override protected val notificationsQueue = config.get[String]("ingest.notificationsQueue")
private implicit val ddbClient = dynamoClientMgr.getNewAsyncDynamoClient(config.getOptional[String]("externalData.awsProfile"))
lazy val defaultRegion = config.getOptional[String]("externalData.awsRegion").getOrElse("eu-west-1")
override def convertMessageBody(body: String): Either[io.circe.Error, IngestMessage] =
io.circe.parser.parse(body).flatMap(_.as[IngestMessage])
override def receive: Receive = {
case StartAnalyse(entry) =>
val originalSender = sender()
proxyGenerators.requestMetadataAnalyse(entry,defaultRegion).onComplete({
case Success(Left(problem))=>
logger.error(s"Could not request analyse for $entry: $problem")
originalSender ! Status.Failure(new RuntimeException(problem))
case Success(Right(status))=>
logger.info(s"Started metadata analyse: $status")
originalSender ! Status.Success
case Failure(err)=>
logger.error("Metadata request thread failed: ", err)
originalSender ! Status.Failure(err)
})
case CheckRegisteredThumb(entry) =>
val originalSender = sender()
proxyLocationDAO.getProxy(entry.id, ProxyType.THUMBNAIL).map({
case Some(proxyLocation) =>
logger.info(s"${entry.bucket}:${entry.path} already has a registered thumbnail at $proxyLocation")
originalSender ! Status.Success
case None =>
logger.info(s"${entry.bucket}:${entry.path} has no registered thumbnail")
ownRef ! CheckNonRegisteredThumb(entry)
}).onComplete({
case Success(_) => ()
case Failure(err) =>
logger.error("Could not look up proxy data: ", err)
originalSender ! Status.Failure
})
case CheckNonRegisteredThumb(entry) =>
val originalSender = sender()
implicit val s3Client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), entry.region.map(Region.of))
ProxyLocator.findProxyLocation(entry).map(results => {
val foundProxies = results.collect({ case Right(loc) => loc }).filter(loc => loc.proxyType == ProxyType.THUMBNAIL)
if (foundProxies.isEmpty) {
logger.info(s"${entry.bucket}:${entry.path} has no locatable thumbnails in expected locations. Generating a new one...")
ownRef ! CreateNewThumbnail(entry)
} else {
logger.info(s"${entry.bucket}:${entry.path}: Found existing potential thumbnails: $foundProxies")
//add given items to the proxies table for the item and then update the index record to say it's proxied
Future
.sequence(foundProxies.map(proxyLocationDAO.saveProxy))
.map(results => {
ProxyLocator.setProxiedWithRetry(entry.id)
results
})
}
}).onComplete({
case Success(_) => ()
case Failure(err) =>
logger.error(s"${entry.bucket}:${entry.path}: Could not run proxy location find: ", err)
originalSender ! Status.Failure
})
case CreateNewThumbnail(entry) =>
val originalSender = sender()
proxyGenerators.requestProxyJob(RequestType.THUMBNAIL,entry,None).onComplete({
case Success(Success(result)) => //thread completed and we got a result
logger.info(s"${entry.bucket}:${entry.path}: started thumbnailing with ECS id $result")
originalSender ! Status.Success
case Success(Failure(err)) => //thread completed OK but we did not start a job
logger.error(s"${entry.bucket}:${entry.path}: Could not start thumbnailing:", err)
originalSender ! Status.Failure
case Failure(err) =>
logger.error(s"${entry.bucket}:${entry.path}: thumbnailing thread failed", err)
originalSender ! Status.Failure
})
case CheckNonRegisteredProxy(entry) =>
val originalSender = sender()
implicit val s3Client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), entry.region.map(Region.of))
ProxyLocator.findProxyLocation(entry).flatMap(results => {
val foundProxies = results.collect({ case Right(loc) => loc }).filter(loc => loc.proxyType != ProxyType.THUMBNAIL)
if (foundProxies.isEmpty) {
logger.info(s"${entry.bucket}:${entry.path} has no locatable proxies in expected locations. Generating a new one...")
proxyGenerators.defaultProxyType(entry) match {
case Some(proxyType) =>
proxyGenerators.requestProxyJob(RequestType.PROXY, entry, Some(proxyType)).map(result=>Seq(result))
case None=>
logger.error(s"No default proxy type available for ${entry.bucket}:${entry.path} (${entry.mimeType.toString})")
throw new RuntimeException("No default proxy type available")
}
} else {
logger.info(s"${entry.bucket}:${entry.path} has unregistered proxies: $foundProxies")
Future
.sequence(foundProxies.map(proxyLocationDAO.saveProxy))
.map(results => {
ProxyLocator.setProxiedWithRetry(entry.id)
results
})
}
}).onComplete({
case Success(_) => ()
case Failure(err) =>
logger.error(s"${entry.bucket}:${entry.path}: findProxyLocation failed: ", err)
originalSender ! Status.Failure
})
case CheckRegisteredProxy(entry) =>
val possibleProxyTypes = Seq(ProxyType.AUDIO, ProxyType.VIDEO)
val originalSender = sender()
Future.sequence(possibleProxyTypes.map(pt => proxyLocationDAO.getProxy(entry.id, pt))).map(results => {
val validProxies = results.collect({ case Some(proxyLocation) => proxyLocation })
if (validProxies.isEmpty) {
logger.info(s"${entry.bucket}:${entry.path} has no known proxies, checking for loose...")
ownRef ! CheckNonRegisteredProxy(entry)
} else {
logger.info(s"${entry.bucket}:${entry.path} has these known proxies: $validProxies")
originalSender ! Status.Success
}
}).onComplete({
case Success(_) =>
()
case Failure(err) =>
logger.error("Could not check for existing proxies", err)
originalSender ! Status.Failure
})
case HandleDomainMessage(finalMsg:IngestMessage, queueUrl, receiptHandle)=>
logger.info(s"Received notification of new item: ${finalMsg.archiveEntry}")
indexer.getById(finalMsg.archiveEntry.id).onComplete({
case Success(entry)=>
logger.info(s"DEBUGGING - indexed entry for ${finalMsg.archiveEntry.id} was $entry at point of receive")
case Failure(err)=>
logger.error(s"Could not check indexed entry for ${finalMsg.archiveEntry.id}: ${err.getMessage}", err)
})
ownRef ! CheckRegisteredThumb(finalMsg.archiveEntry)
ownRef ! CheckRegisteredProxy(finalMsg.archiveEntry)
ownRef ! StartAnalyse(finalMsg.archiveEntry)
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle))
ownRef ! ReadyForNextMessage //tell the superclass we are ready for the next message
case other:GenericSqsActor.SQSMsg => handleGeneric(other)
}
}