app/controllers/JobController.scala (118 lines of code) (raw):
package controllers
import java.util.UUID
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.{ActorMaterializer, Materializer}
import auth.{BearerTokenAuth, Security}
import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager}
import org.scanamo.DynamoReadError
import com.theguardian.multimedia.archivehunter.common._
import javax.inject.{Inject, Named, Singleton}
import play.api.{Configuration, Logger}
import play.api.libs.circe.Circe
import play.api.mvc.{AbstractController, ControllerComponents}
import io.circe.generic.auto._
import io.circe.syntax._
import com.theguardian.multimedia.archivehunter.common.cmn_models._
import requests.JobSearchRequest
import responses._
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.Future
import com.theguardian.multimedia.archivehunter.common.ProxyTranscodeFramework.ProxyGenerators
import org.slf4j.LoggerFactory
import play.api.cache.SyncCacheApi
@Singleton
class JobController @Inject() (override val config:Configuration,
override val controllerComponents:ControllerComponents,
jobModelDAO: JobModelDAO,
esClientManager: ESClientManager,
s3ClientManager: S3ClientManager,
ddbClientManager:DynamoClientManager,
override val bearerTokenAuth: BearerTokenAuth,
override val cache:SyncCacheApi,
proxyLocationDAO:ProxyLocationDAO,
proxyGenerators:ProxyGenerators)
(implicit actorSystem:ActorSystem, mat:Materializer)
extends AbstractController(controllerComponents) with Circe with JobModelEncoder with ZonedDateTimeEncoder with Security with QueryRemaps {
override protected val logger=LoggerFactory.getLogger(getClass)
private val indexName = config.getOptional[String]("externalData.indexName").getOrElse("archivehunter")
protected implicit val indexer = new Indexer(indexName)
def renderListAction(block: ()=>Future[List[Either[DynamoReadError, JobModel]]]) = IsAuthenticatedAsync { uid=> request=>
val resultFuture = block()
resultFuture.recover({
case ex:Throwable=>
logger.error("Could not render list of jobs: ", ex)
Future(InternalServerError(GenericErrorResponse("error", ex.toString).asJson))
})
resultFuture.map(result => {
val failures = result.collect({ case Left(err) => err })
if (failures.nonEmpty) {
logger.error(s"Can't list all jobs: $failures")
InternalServerError(GenericErrorResponse("error", failures.map(_.toString).mkString(", ")).asJson)
} else {
Ok(ObjectListResponse("ok", "job", result.collect({ case Right(job) => job }), result.length).asJson)
}
})
}
def getAllJobs(limit:Int, scanFrom:Option[String]) = renderListAction(()=>jobModelDAO.allJobs(limit))
def jobsFor(fileId:String) = renderListAction(()=>jobModelDAO.jobsForSource(fileId))
def getJob(jobId:String) = IsAuthenticatedAsync { uid=> request=>
jobModelDAO.jobForId(jobId).map({
case None=>
NotFound(GenericErrorResponse("not_found", "Job ID is not found").asJson)
case Some(Left(err))=>
logger.error(s"Could not look up job info: ${err.toString}")
InternalServerError(GenericErrorResponse("db_error", s"Could not look up job: ${err.toString}").asJson)
case Some(Right(jobModel))=>
Ok(ObjectGetResponse("ok", "job", jobModel).asJson)
})
}
def jobSearch(clientLimit:Int) = IsAuthenticatedAsync(circe.json(2048)) { uid=> request=>
implicit val daoImplicit = jobModelDAO
request.body.as[JobSearchRequest] match {
case Left(err)=>
Future(BadRequest(GenericErrorResponse("bad_request", err.toString).asJson))
case Right(rq)=>
rq.runSearch(clientLimit).map({
case Left(errors)=>
logger.error("Could not perform job search: ")
errors.foreach(err=>logger.error(err.toString))
InternalServerError(GenericErrorResponse("db_error", errors.head.toString).asJson)
case Right(results)=>
Ok(ObjectListResponse("ok","job",results, results.length).asJson)
}).recoverWith({
case err:Throwable=>
logger.error("Could not scan jobs: ", err)
Future(InternalServerError(GenericErrorResponse("db_error", err.toString).asJson))
})
}
}
def rerunProxy(jobId:String) = IsAuthenticatedAsync { uid=> request=>
Try { UUID.fromString(jobId) } match {
case Success(jobUuid) =>
proxyGenerators.rerunProxyJob(jobUuid).map({
case Right(jobId) =>
Ok(ObjectCreatedResponse("ok","jobId",jobId).asJson)
case Left(err) =>
InternalServerError(GenericErrorResponse("error", err).asJson)
})
case Failure(err) =>
Future(BadRequest(GenericErrorResponse("error", "You did not input a valid UUID").asJson))
}
}
def refreshTranscodeInfo(jobId:String) = IsAuthenticatedAsync { _=> _=>
implicit val timeout:akka.util.Timeout = 30 seconds
// jobModelDAO.jobForId(jobId).flatMap({
// case None=>
// Future(NotFound(GenericErrorResponse("not_found","job ID not found").asJson))
// case Some(Left(err))=>
// logger.error(s"Could not read from jobs database: ${err.toString}")
// Future(InternalServerError(GenericErrorResponse("db_error", err.toString).asJson))
// case Some(Right(jobModel))=>
// val resultFuture = (etsProxyActor ? ETSProxyActor.ManualJobStatusRefresh(jobModel)).mapTo[ETSMsgReply]
// resultFuture.map({
// case ETSProxyActor.PreparationFailure(err)=>
// logger.error("Could not refresh transcode info", err)
// InternalServerError(GenericErrorResponse("error", err.toString).asJson)
// case ETSProxyActor.PreparationSuccess(transcodeId, rtnJobId)=>
// Ok(ObjectGetResponse("ok","job_id",rtnJobId).asJson)
// })
// })
Future(InternalServerError(GenericErrorResponse("not_implemented","Not currently implemented").asJson))
}
def reRunJobsForCollection(collectionName:String) = IsAuthenticatedAsync { _=> _=>
jobModelDAO.jobsForType("ProblemItemRerun").map(resultList=>{
val failures = resultList.collect({case Left(err)=>err})
if(failures.nonEmpty){
InternalServerError(ErrorListResponse("error","Could not look up jobs", failures.map(_.toString)).asJson)
}
val results = resultList.collect({case Right(job)=>job}).filter(_.sourceId==collectionName)
Ok(ObjectListResponse("ok","Job", results, results.length).asJson)
})
}
}