app/services/ProxyFrameworkQueue.scala (434 lines of code) (raw):
package services
import java.time.ZonedDateTime
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.{ActorMaterializer, Materializer}
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest}
import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient}
import com.theguardian.multimedia.archivehunter.common._
import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager, SQSClientManager}
import com.theguardian.multimedia.archivehunter.common.cmn_models._
import helpers.ProxyLocator
import io.circe.generic.auto._
import javax.inject.{Inject, Singleton}
import models.{AwsSqsMsg, JobReportNew, JobReportStatus, JobReportStatusEncoder}
import org.slf4j.MDC
import play.api.{Configuration, Logger}
import software.amazon.awssdk.regions.Region
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object ProxyFrameworkQueue extends GenericSqsActorMessages {
trait PFQMsg extends SQSMsg
case class HandleSuccess(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class HandleCheckSetup(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class HandleGenericSuccess(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class HandleTranscodingSetup(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class HandleSuccessfulProxy(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class HandleSuccessfulAnalyse(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class UpdateProblemsIndexSuccess(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class HandleFailure(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class HandleWarning(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
case class HandleRunning(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef) extends PFQMsg
}
trait ProxyFrameworkQueueFunctions extends ProxyTypeEncoder with JobReportStatusEncoder with MediaMetadataEncoder {
protected val indexer:Indexer
protected val logger:Logger
protected implicit val esClient:ElasticClient
def convertMessageBody(body: String) =
AwsSqsMsg.fromJsonString(body).flatMap(snsMsg=>{
io.circe.parser.parse(snsMsg.Message).flatMap(_.as[JobReportNew]).map(_.copy(timestamp = Some(ZonedDateTime.parse(snsMsg.Timestamp))))
})
}
/**
* Actor that handles messages returned via SQS from the Proxy Framework. Successful processing ends with the message being
* deleted from the queue; an error in processing means that the message will NOT be deleted. The hope is that whatever
* is preventing the message from processing is a transient failure that will succeed at some point in the future.
* Permanent failures are dealt with by a dead-letter queue in SQS.
* This is intended to be instansiated via Guice during the app startup in Module,
* automatically resolving all parameters
*
* @param config Configuration object
* @param system Actor System
* @param sqsClientManager client manager class instance for SQS
* @param s3ClientMgr client manager class instance for S3
* @param dynamoClientMgr client manager class for DynamoDB
* @param jobModelDAO Data Access Object for job models
* @param scanTargetDAO Data Access Object for scan targets
* @param esClientMgr client manager class instance for Elastic Search
* @param proxyLocationDAO Data Access Object for proxy locations
*/
@Singleton
class ProxyFrameworkQueue @Inject() (config: Configuration,
system: ActorSystem,
sqsClientManager: SQSClientManager,
s3ClientMgr: S3ClientManager,
dynamoClientMgr: DynamoClientManager,
jobModelDAO: JobModelDAO,
scanTargetDAO: ScanTargetDAO,
esClientMgr:ESClientManager
)(implicit proxyLocationDAO: ProxyLocationDAO, override val mat:Materializer)
extends GenericSqsActor[JobReportNew] with ProxyFrameworkQueueFunctions {
import ProxyFrameworkQueue._
import GenericSqsActor._
protected val logger = Logger(getClass)
override protected val sqsClient = sqsClientManager.getClient(config.getOptional[String]("externalData.awsProfile"))
override protected implicit val implSystem = system
//override this in testing
protected val ownRef: ActorRef = self
override protected implicit val ec: ExecutionContext = system.dispatcher
override protected val notificationsQueue = config.get[String]("proxyFramework.notificationsQueue")
private implicit val ddbClient = dynamoClientMgr.getNewAsyncDynamoClient(config.getOptional[String]("externalData.awsProfile"))
protected implicit val esClient = esClientMgr.getClient()
protected implicit val indexer = new Indexer(config.get[String]("externalData.indexName"))
lazy val defaultRegion = config.getOptional[String]("externalData.awsRegion").getOrElse("eu-west-1")
protected val problemItemIndexName = config.get[String]("externalData.problemItemsIndex")
protected val problemItemIndexer = new ProblemItemIndexer(problemItemIndexName)
/**
* looks up the ArchiveEntry for the original media associated with the given JobModel
* @param jobDesc JobModel instance
* @param esClient implicitly provided ElasticSearch HttpClient
* @param indexer implicitly provided Indexer instance
* @param ec implicitly provided ExecutionContext
* @return a Future, containing either Left with an error string or Right with an ArchiveEntry
*/
def thumbnailJobOriginalMedia(jobDesc:JobModel) = jobDesc.sourceType match {
case SourceType.SRC_MEDIA=>
logger.debug(s"Getting original media for $jobDesc")
indexer.getById(jobDesc.sourceId)
.map(result=>{
logger.debug(s"Original media for ${jobDesc.jobId} is $result")
result
})
.map(result=>Right(result))
case SourceType.SRC_PROXY=>
Future(Left("need original media!"))
case SourceType.SRC_THUMBNAIL=>
Future(Left("need original media!"))
}
/**
* looks up the ArchiveEntry referenced by the job model and executes the provided block on it if the lookup succeeds.
* If not, updates the job to a failed status and signals the sender that the operation failed, but does not delete the
* message from the queue
* @param msg
* @param jobDesc
* @param rq
* @param receiptHandle
* @param originalSender
* @param block
* @return
*/
def withArchiveEntry(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef)(block:ArchiveEntry=>Unit) = {
indexer.getById(jobDesc.sourceId)
.map(entry=>block(entry))
.recover({
case err:Throwable=>
logger.error("Could not look up archive entry: ", err)
val updatedLog = jobDesc.log match {
case Some(existingLog) => existingLog + "\n" + s"Could not look up archive entry: ${err.toString}"
case None => s"Could not look up archive entry: ${err.toString}"
}
val updatedJobDesc = jobDesc.copy(jobStatus = JobStatus.ST_ERROR, log = Some(updatedLog))
jobModelDAO.putJob(updatedJobDesc)
ownRef ! HandleFailure(msg, jobDesc, queueUrl, receiptHandle, originalSender)
originalSender ! akka.actor.Status.Failure(err)
})
}
/**
* looks up a ScanTarget waiting for this job and executes the provided block on it (within a Future) if the lookup succeeds.
* if the lookup does not succeed then mark the job as failed.
* @param msg JobReportNew describing the results of the job from the proxy framework
* @param jobDesc JobModel from the database
* @param rq ReceiveMessageRequest object from SQS, this is passed straight on to the block
* @param receiptHandle receipt handle from SQS, to ensure that the message is deleted after successful processing
* @param originalSender the actor that originally sent us the message. sender() refs get nullified when we go into threads.
* @param block block to call if ScanTarget can be found. Must accept the ScanTarget as its only argument, and return unit (i.e. nothing)
* @return Future of Unit.
*/
def withScanTarget(msg:JobReportNew, jobDesc:JobModel, queueUrl:String, receiptHandle:String, originalSender:ActorRef)(block:ScanTarget=>Unit) =
scanTargetDAO.waitingForJobId(jobDesc.jobId).map({
case Left(errList) =>
logger.error(s"Could not find scan target: $errList")
val updatedLog = jobDesc.log match {
case Some(existingLog) => existingLog + "\n" + s"Database error: $errList"
case None => s"Database error: $errList"
}
val updatedJobDesc = jobDesc.copy(jobStatus = JobStatus.ST_ERROR, log = Some(updatedLog))
jobModelDAO.putJob(updatedJobDesc)
ownRef ! HandleFailure(msg, jobDesc, queueUrl, receiptHandle, originalSender)
originalSender ! akka.actor.Status.Failure(new RuntimeException(s"Could not locate scan target: $errList"))
case Right(None) =>
logger.error(s"No scanTarget is waiting for ${jobDesc.jobId} so nothing to update")
val updatedLog = jobDesc.log match {
case Some(existingLog) => existingLog + "\n" + s"No scanTarget is waiting for ${jobDesc.jobId} so nothing to update"
case None => s"No scanTarget is waiting for ${jobDesc.jobId} so nothing to update"
}
val updatedJobDesc = jobDesc.copy(jobStatus = JobStatus.ST_ERROR, log = Some(updatedLog))
jobModelDAO.putJob(updatedJobDesc)
ownRef ! HandleFailure(msg, jobDesc, queueUrl, receiptHandle, originalSender)
originalSender ! akka.actor.Status.Failure(new RuntimeException(s"No scanTarget is waiting for ${jobDesc.jobId} so nothing to update"))
case Right(Some(scanTarget)) =>
block(scanTarget)
})
/**
* update the proxy location in the database
* @param proxyUri new proxy location URI
* @param archiveEntry ArchiveEntry instance of the media that this proxy is for
* @return a Future with either an error string or a success containing the updated record (if available)
*/
def updateProxyRef(proxyUri:String, archiveEntry:ArchiveEntry, proxyType:ProxyType.Value) = {
logger.debug(s"updateProxyRef: got $proxyUri in with archive entry in region ${archiveEntry.region}")
implicit val s3Client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), archiveEntry.region.map(Region.of))
ProxyLocation.fromS3(
proxyUri = proxyUri,
mainMediaUri = s"s3://${archiveEntry.bucket}/${archiveEntry.path}",
proxyType = Some(proxyType),
region = Region.of(archiveEntry.region.getOrElse(config.get[String]("externalData.awsRegion")))
)
.flatMap({
case Left(err) =>
logger.error(s"Could not get proxy location: $err")
Future(Left(err))
case Right(proxyLocation) =>
logger.info("Saving proxy location...")
proxyLocationDAO
.saveProxy(proxyLocation)
.map(_=>Right(proxyLocation))
.recover({
case err:Throwable=>
logger.error(s"Could not save proxy location for file id ${proxyLocation.fileId}: ${err.getMessage}", err)
Left(err.getMessage)
})
})
}
override def receive: Receive = {
/**
* update problem items index to show that an item has been successfully proxied
*/
case UpdateProblemsIndexSuccess(msg, jobDesc, rq, receiptHandle, originalSender)=>
logger.info("Updating problems index: ")
jobDesc.jobType.toLowerCase() match {
case "proxy" =>
msg.proxyType match {
case Some(proxyType) =>
logger.debug(s"Updating problem item for ${proxyType.toString}")
problemItemIndexer.getById(jobDesc.sourceId).map(problemItem => {
val entry = problemItem.copyExcludingResult(proxyType)
if(entry.verifyResults.nonEmpty) {
problemItemIndexer.indexSingleItem(entry)
} else {
problemItemIndexer.deleteEntry(entry)
}
///don't send to originalSender as we are not on the critical path
//originalSender ! akka.actor.Status.Success
}).recover({
case err: Throwable =>
logger.warn(s"Could not update problems index for $jobDesc: ", err)
//originalSender ! akka.actor.Status.Failure(err)
})
case None =>
logger.warn(s"Can't update problems index for proxy if there is no proxy type")
//originalSender ! akka.actor.Status.Failure(new RuntimeException("Can't update problems index for proxy if there is no proxy type"))
}
case "thumbnail" =>
logger.debug("Updating problem item for thumbnail")
problemItemIndexer.getById(jobDesc.sourceId).map(item => {
val entry = item.copyExcludingResult(ProxyType.THUMBNAIL)
if(entry.verifyResults.nonEmpty) {
problemItemIndexer.indexSingleItem(entry)
} else {
problemItemIndexer.deleteEntry(entry)
}
})
case _=>
logger.warn("Can't update problems index for non proxy jobs")
//originalSender ! akka.actor.Status.Failure(new RuntimeException("Can't update problems index for non proxy jobs"))
}
/**
* if a proxy job completed successfully, then update the proxy location table with the newly generated proxy
*/
case HandleSuccessfulProxy(msg, jobDesc, rq, receiptHandle, originalSender)=>
logger.info(s"Outboard process indicated job success: $msg")
if(msg.output.isEmpty){
logger.error(s"Message $msg logged as success but had no 'output' field!")
originalSender ! akka.actor.Status.Failure(new RuntimeException(s"Message logged as success but had no 'output' field!"))
} else {
val proxyType = msg.proxyType match {
case Some(pt)=>pt
case None =>
logger.warn("No proxy type information from transcode framework, guessing based on job type")
jobDesc.jobType.toLowerCase match {
case "thumbnail" => ProxyType.THUMBNAIL
case "proxy" => ProxyType.VIDEO
}
}
//set up parallel jobs, one to update dynamo, one to update elasticsearch
val proxyUpdateFuture = thumbnailJobOriginalMedia(jobDesc).flatMap({
case Left(err) => Future(Left(err))
case Right(archiveEntry) => updateProxyRef(msg.output.get, archiveEntry, proxyType)
})
val indexUpdateFuture = ProxyLocator.setProxiedWithRetry(jobDesc.sourceId)
//set up a future to complete when both of the update jobs have run. this marshals a single success/failure flag
val overallCompletionFuture = Future.sequence(Seq(proxyUpdateFuture, indexUpdateFuture)).map(results=>{
val errors = results.collect({case Left(err)=>err})
if(errors.nonEmpty){
logger.error(s"Could not update proxy: $errors")
Left(errors.mkString(","))
} else {
results.head
}
})
//handle overall success/failure
overallCompletionFuture.map({
case Left(err) =>
logger.error(s"Could not update proxy: $err")
val updatedJd = jobDesc.copy(completedAt = Some(ZonedDateTime.now), log = Some(s"Could not update proxy: $err"), jobStatus = JobStatus.ST_ERROR)
jobModelDAO.putJob(updatedJd).onComplete({
case Success(_) =>
originalSender ! akka.actor.Status.Failure(new RuntimeException(s"Could not update proxy: $err"))
case Failure(dbErr) =>
originalSender ! akka.actor.Status.Failure(dbErr)
})
case Right(_) =>
ownRef ! UpdateProblemsIndexSuccess(msg, jobDesc, rq, receiptHandle, originalSender)
ownRef ! HandleGenericSuccess(msg, jobDesc, rq, receiptHandle, originalSender)
}).recover({
case err:Throwable=>
logger.error("Could not update proxy: ", err)
originalSender ! akka.actor.Status.Failure(err)
})
}
/**
* handle response message from a "check setup" or "setup transcoder" request
*/
case HandleCheckSetup(msg, jobDesc, rq, receiptHandle, originalSender)=>
withScanTarget(msg, jobDesc, rq, receiptHandle, originalSender) {scanTarget=>
val actualStatus = msg.status match {
case JobReportStatus.SUCCESS=>JobStatus.ST_SUCCESS
case JobReportStatus.FAILURE=>JobStatus.ST_ERROR
case JobReportStatus.RUNNING=>JobStatus.ST_RUNNING
case JobReportStatus.WARNING=>JobStatus.ST_RUNNING
}
val updatedPendingJobIds = scanTarget.pendingJobIds.map(_.filter(value=>value!=jobDesc.jobId))
val tcReport = TranscoderCheck(ZonedDateTime.now(),actualStatus,msg.decodedLog.collect({
case Left(err)=>err
case Right(msg)=>msg
}))
val updatedScanTarget = scanTarget.copy(pendingJobIds = updatedPendingJobIds, transcoderCheck = Some(tcReport))
scanTargetDAO.put(updatedScanTarget)
val updatedJobDesc = jobDesc.copy(jobStatus = actualStatus, completedAt = Some(ZonedDateTime.now()))
jobModelDAO.putJob(updatedJobDesc).onComplete({
case Success(_)=>
logger.info(s"Updated job ${jobDesc.jobId}")
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(rq).withReceiptHandle(receiptHandle))
case Failure(err)=>
logger.error(s"Could not update job ${jobDesc.jobId}: ${err.getMessage}", err)
})
if(actualStatus==JobStatus.ST_ERROR){
ownRef ! HandleFailure(msg, jobDesc, rq, receiptHandle, originalSender)
} else {
originalSender ! akka.actor.Status.Success
}
}
/**
* Handle the response message from a successful "analyse", i.e. determination of system metadata,
* by pushing the captured metadata into the index record
*/
case HandleSuccessfulAnalyse(msg, jobDesc, rq, receiptHandle, originalSender)=>
withArchiveEntry(msg, jobDesc, rq, receiptHandle, originalSender) { entry=>
logger.info(s"Received updated metadata ${msg.metadata} for ${entry.id}")
val updatedEntry = entry.copy(mediaMetadata = msg.metadata)
indexer.indexSingleItem(updatedEntry).map({
case Left(err)=>
MDC.put("entry",updatedEntry.toString)
MDC.put("error", err.toString)
logger.error(s"Could not update index: $err")
val updatedLog = jobDesc.log match {
case Some(existingLog) => existingLog + "\n" + s"Could not update index: ${err.toString}"
case None => s"Could not update index: ${err.toString}"
}
val updatedJobDesc = jobDesc.copy(jobStatus = JobStatus.ST_ERROR, log = Some(updatedLog))
jobModelDAO.putJob(updatedJobDesc)
ownRef ! HandleFailure(msg, jobDesc, rq, receiptHandle, originalSender)
originalSender ! akka.actor.Status.Failure(new RuntimeException(err.toString))
case Right(newId)=>
logger.info(s"Updated media metadata for $newId")
ownRef ! HandleGenericSuccess(msg, jobDesc, rq, receiptHandle, originalSender)
})
}
/**
* generic stuff for handling success messages - update the job to show completion and logs, then delete the incoming
* message from SQS
*/
case HandleGenericSuccess(msg, jobDesc, rq, receiptHandle, originalSender)=>
logger.info(s"HandleGenericSuccess for $jobDesc")
val updatedJob = jobDesc.copy(jobStatus = JobStatus.ST_SUCCESS, completedAt = Some(ZonedDateTime.now()), log=msg.decodedLog.collect({
case Left(err)=>s"$err for ${msg.log}"
case Right(log)=>log
}))
jobModelDAO.putJob(updatedJob).map(_=> {
logger.info(s"Job ${jobDesc.jobId} updated")
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(rq).withReceiptHandle(receiptHandle))
originalSender ! akka.actor.Status.Success
}).recover({
case err:Throwable=>
logger.error(s"Could not update job description: $err")
originalSender ! akka.actor.Status.Failure(new RuntimeException(err.toString))
})
/**
* route a success message to the appropriate handler
*/
case HandleSuccess(msg, jobDesc, rq, receiptHandle, originalSender)=>
logger.debug(s"Got success for jobDesc $jobDesc")
jobDesc.jobType match {
case "proxy"=>self ! HandleSuccessfulProxy(msg, jobDesc, rq, receiptHandle, originalSender)
case "thumbnail"=>self ! HandleSuccessfulProxy(msg, jobDesc, rq, receiptHandle, originalSender)
case "analyse"=>self ! HandleSuccessfulAnalyse(msg, jobDesc, rq, receiptHandle, originalSender)
case "Analyse"=>self ! HandleSuccessfulAnalyse(msg, jobDesc, rq, receiptHandle, originalSender)
case _=>
logger.error(s"Don't know how to handle job type ${jobDesc.jobType} coming back from transcoder")
originalSender ! akka.actor.Status.Failure(new RuntimeException(s"Don't know how to handle job type ${jobDesc.jobType} coming back from transcoder"))
}
/**
* if a proxy job started running, update the job status in the database
*/
case HandleRunning(msg, jobDesc, rq, receiptHandle, originalSender)=>
val updatedJd = jobDesc.copy(jobStatus = JobStatus.ST_RUNNING)
jobModelDAO.putJob(updatedJd).onComplete({
case Success(_)=>
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(rq).withReceiptHandle(receiptHandle))
originalSender ! akka.actor.Status.Success
case Failure(err)=>
logger.error("Could not update job model in database", err)
originalSender ! akka.actor.Status.Failure(err)
})
case HandleWarning(msg, jobDesc, rq, receiptHandle, originalSender)=>
val updatedJd = jobDesc.copy(completedAt = Some(ZonedDateTime.now), jobStatus = JobStatus.ST_WARNING, log = msg.decodedLog.map(_.fold(
err=>s"Could not decode job log ${msg.log}: $err",
logContent=>logContent
)))
val proxyType = msg.proxyType match {
case Some(pt)=>pt
case None =>
logger.warn("No proxy type information from transcode framework, guessing based on job type")
jobDesc.jobType.toLowerCase match {
case "thumbnail" => ProxyType.THUMBNAIL
case "proxy" => ProxyType.VIDEO
}
}
val proxyUpdateFuture = msg.output match {
case Some(output) => //if msg.output is set, then we can still set the proxy even though the state is warning.
thumbnailJobOriginalMedia(jobDesc).flatMap({
case Left(err) => Future(Left(err))
case Right(archiveEntry) => updateProxyRef(msg.output.get, archiveEntry, proxyType)
})
case None => Future(Right(None)) //if there is no output there's nothing to update, but no failure either.
}
proxyUpdateFuture.map({
case Left(err) =>
logger.error(s"Could not update proxy: $err")
val updatedJd = jobDesc.copy(completedAt = Some(ZonedDateTime.now), log = Some(s"Could not update proxy: $err"), jobStatus = JobStatus.ST_ERROR)
jobModelDAO.putJob(updatedJd).onComplete({
case Success(_) =>
logger.error(s"could not update proxy: $err")
originalSender ! akka.actor.Status.Failure(new RuntimeException(s"Could not update proxy: $err"))
case Failure(dbErr) =>
logger.error(s"job table update failed: ", dbErr)
originalSender ! akka.actor.Status.Failure(dbErr)
})
case Right(_) =>
jobModelDAO.putJob(updatedJd).onComplete({
case Success(_)=>
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(rq).withReceiptHandle(receiptHandle))
originalSender ! akka.actor.Status.Success
case Failure(err)=>
logger.error("Could not update job model in database", err)
originalSender ! akka.actor.Status.Failure(err)
})
}).recover({
case err:Throwable=>
logger.error("Could not update proxy: ", err)
originalSender ! akka.actor.Status.Failure(err)
})
/**
* if a proxy job failed, then record this in the database along with any decoded log
*/
case HandleFailure(msg, jobDesc, rq, receiptHandle, originalSender)=>
val updatedJd = jobDesc.copy(completedAt = Some(ZonedDateTime.now),
jobStatus = JobStatus.ST_ERROR,
log=msg.decodedLog.map(_.fold(
err=>s"Could not decode job log: $err",
logContent=>logContent
))
)
jobModelDAO.putJob(updatedJd).onComplete({
case Success(_)=>
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(rq).withReceiptHandle(receiptHandle))
originalSender ! akka.actor.Status.Success
case Failure(err)=>
logger.error("Could not update job model in database", err)
originalSender ! akka.actor.Status.Failure(err)
})
case HandleDomainMessage(msg: JobReportNew, queueUrl, receiptHandle)=>
logger.debug(s"HandleDomainMessage: $msg")
sender () ! ReadyForNextMessage //replicate old behaviour by immediately acknowledging
val originalSender=sender()
jobModelDAO.jobForId(msg.jobId).map({
case None =>
logger.error(s"Could not process $msg: No job found for ${msg.jobId}")
if(msg.jobId == "test-job"){
//delete out "test-job" which are used for manual testing
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle))
}
originalSender ! akka.actor.Status.Failure(new RuntimeException(s"Could not process $msg: No job found for ${msg.jobId}"))
case Some(Left(err)) =>
logger.error(s"Could not look up job for $msg: ${err.toString}")
originalSender ! akka.actor.Status.Failure(new RuntimeException(s"Could not look up job for $msg: ${err.toString}"))
case Some(Right(jobDesc)) =>
logger.debug(s"Got jobDesc $jobDesc")
logger.debug(s"jobType: ${jobDesc.jobType} jobReportStatus: ${msg.status}")
if(isMessageOutOfDate(jobDesc.lastUpdatedTS, msg.timestamp)){
logger.info(s"Received outdated message update: job ${jobDesc.jobId} had ${jobDesc.lastUpdatedTS}, message had ${msg.timestamp}")
sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle))
} else {
val updatedJobDesc = if(msg.timestamp.isDefined) jobDesc.copy(lastUpdatedTS = msg.timestamp) else jobDesc
if (updatedJobDesc.jobType == "CheckSetup" || updatedJobDesc.jobType == "SetupTranscoding") {
ownRef ! HandleCheckSetup(msg, updatedJobDesc, queueUrl, receiptHandle, originalSender)
} else {
msg.status match {
case JobReportStatus.SUCCESS => ownRef ! HandleSuccess(msg, updatedJobDesc, queueUrl, receiptHandle, originalSender)
case JobReportStatus.FAILURE => ownRef ! HandleFailure(msg, updatedJobDesc, queueUrl, receiptHandle, originalSender)
case JobReportStatus.RUNNING => ownRef ! HandleRunning(msg, updatedJobDesc, queueUrl, receiptHandle, originalSender)
case JobReportStatus.WARNING => ownRef ! HandleWarning(msg, updatedJobDesc, queueUrl, receiptHandle, originalSender)
}
}
}
}).recover({
case err:Throwable=>
logger.error(s"Could not look up job for $msg in database: ${err.toString}", err)
})
case other:GenericSqsActor.SQSMsg => handleGeneric(other)
}
/**
* check whether the given message is out of date, i.e. the db record has been updated by a message sent after this one.
* @param maybeJobLast timestamp of the last update message to the job
* @param maybeMsgTimestamp timestamp of this update message
* @return Boolean indicating whether the message is out of date. true if it is, and should be discarded; false if it should be procesed.
*/
private def isMessageOutOfDate(maybeJobLast:Option[ZonedDateTime], maybeMsgTimestamp:Option[ZonedDateTime]) = {
maybeMsgTimestamp match {
case None=>
logger.warn("Got message with no timestamp? can't determine if message is outdated.")
false
case Some(msgTimestamp)=>
maybeJobLast match {
case None=>
logger.debug("Job has no timestamp, so we must be first")
false
case Some(jobLast)=>
msgTimestamp.isBefore(jobLast)
}
}
}
}