app/controllers/ProxiesController.scala (415 lines of code) (raw):
package controllers
import java.time.ZonedDateTime
import java.util.UUID
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.AskTimeoutException
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, Materializer}
import akka.util.Timeout
import auth.{BearerTokenAuth, Security}
import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager}
import com.amazonaws.HttpMethod
import com.amazonaws.services.s3.model.{AmazonS3Exception, GeneratePresignedUrlRequest, GetObjectMetadataRequest, ObjectMetadata}
import org.scanamo.{DynamoReadError, ScanamoAlpakka, Table}
import com.theguardian.multimedia.archivehunter.common._
import javax.inject.{Inject, Named, Singleton}
import play.api.{Configuration, Logger}
import play.api.libs.circe.Circe
import play.api.mvc.{AbstractController, ControllerComponents}
import responses._
import scala.concurrent.ExecutionContext.Implicits.global
import io.circe.generic.auto._
import io.circe.syntax._
import org.scanamo.syntax._
import org.scanamo.generic.auto._
import com.theguardian.multimedia.archivehunter.common.errors.{ExternalSystemError, NothingFoundError}
import com.theguardian.multimedia.archivehunter.common.cmn_models._
import com.theguardian.multimedia.archivehunter.common.cmn_models.{JobModelDAO, ScanTargetDAO}
import helpers.{ProxyLocator, S3Helper}
import services.ProxiesRelinker
import com.theguardian.multimedia.archivehunter.common.ProxyTranscodeFramework.{ProxyGenerators, RequestType}
import org.slf4j.LoggerFactory
import play.api.cache.SyncCacheApi
import requests.ManualProxySet
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.{DeleteObjectRequest, HeadObjectRequest, HeadObjectResponse, NoSuchKeyException}
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
@Singleton
class ProxiesController @Inject()(override val config:Configuration,
override val controllerComponents:ControllerComponents,
ddbClientMgr: DynamoClientManager,
esClientMgr:ESClientManager,
proxyGenerators: ProxyGenerators,
override val bearerTokenAuth:BearerTokenAuth,
override val cache:SyncCacheApi,
@Named("proxiesRelinker") proxiesRelinker:ActorRef)
(implicit actorSystem:ActorSystem,
mat:Materializer,
scanTargetDAO:ScanTargetDAO,
jobModelDAO:JobModelDAO,
proxyLocationDAO:ProxyLocationDAO,
s3ClientMgr:S3ClientManager)
extends AbstractController(controllerComponents) with Circe with ProxyLocationEncoder with Security {
import akka.pattern.ask
import com.theguardian.multimedia.archivehunter.common.cmn_helpers.S3ClientExtensions._
override protected val logger=LoggerFactory.getLogger(getClass)
private val indexName = config.getOptional[String]("elasticsearch.index").getOrElse("archivehunter")
private val awsProfile = config.getOptional[String]("externalData.awsProfile")
protected val tableName:String = config.get[String]("proxies.tableName")
private val table = Table[ProxyLocation](tableName)
implicit val esClient = esClientMgr.getClient()
implicit val timeout:Timeout = 55 seconds
implicit val indexer = new Indexer(indexName)
val proxyLinkExpiry = 900 //presigned links work for 15 minutes
val defaultRegion = Region.of(config.get[String]("externalData.awsRegion"))
private implicit val ddbAsync = ddbClientMgr.getNewAsyncDynamoClient(awsProfile)
private val scanamoAlpakka = ScanamoAlpakka(ddbAsync)
type ProxyDataType = List[Either[DynamoReadError, ProxyLocation]]
private val MakeProxyDataSink = Sink.fold[ProxyDataType, ProxyDataType](List())(_ ++ _)
def proxyForId(fileId:String, proxyType:Option[String]) = IsAuthenticatedAsync { _=> _=>
proxyType match {
case None=>
scanamoAlpakka
.exec(table.query("fileId"===fileId))
.runWith(Sink.head)
.map(result=>{
val failures = result.collect({case Left(err)=>err})
if(failures.nonEmpty){
logger.error(s"Could not look up proxy for $fileId: $failures")
InternalServerError(GenericErrorResponse("error",failures.map(_.toString).mkString(", ")).asJson)
} else {
val output = result.collect({case Right(entry)=>entry})
Ok(ObjectListResponse("ok","proxy_location",output, output.length).asJson)
}
})
case Some(t)=>
scanamoAlpakka
.exec(table.get("fileId"===fileId and ("proxyType"===t.toUpperCase)))
.runWith(Sink.head)
.map({
case None=>
NotFound(GenericErrorResponse("not_found","No proxy was registered").asJson)
case Some(Left(err))=>
logger.error(s"Could not look up proxy for $fileId: ${err.toString}")
InternalServerError(GenericErrorResponse("db_error", err.toString).asJson)
case Some(Right(items))=>
Ok(responses.ObjectGetResponse("ok","proxy_location",items).asJson)
})
}
}
def getAllProxyRefs(fileId:String) = IsAuthenticatedAsync { _=> _=>
scanamoAlpakka
.exec(table.query("fileId"===fileId))
.runWith(MakeProxyDataSink)
.map(results=>{
val failures = results.collect({ case Left(err) => err})
if(failures.nonEmpty){
failures.foreach(err=>logger.error(s"Could not retrieve proxy reference: $err"))
InternalServerError(GenericErrorResponse("db_error", failures.mkString(", ")).asJson)
} else {
val success = results.collect({ case Right(location)=>location})
Ok(ObjectListResponse("ok","ProxyLocation",success, success.length).asJson)
}
})
}
def getPlayable(fileId:String, proxyType:Option[String]) = IsAuthenticatedAsync { _=> _=>
val actualType = proxyType match {
case None=>"VIDEO"
case Some(t)=>t.toUpperCase
}
scanamoAlpakka
.exec(table.get("fileId"===fileId and ("proxyType"===actualType)))
.runWith(Sink.head)
.map({
case None=>
NotFound(GenericErrorResponse("not_found",s"no $proxyType proxy found for $fileId").asJson)
case Some(Right(proxyLocation))=>
implicit val s3client = s3ClientMgr.getS3Client(awsProfile, proxyLocation.region.map(Region.of))
val expiration = new java.util.Date()
expiration.setTime(expiration.getTime + (1000 * 60 * 60)) //expires in 1 hour
val result = for {
meta <- Try {
val req = HeadObjectRequest.builder().bucket(proxyLocation.bucketName).key(proxyLocation.bucketPath).build()
s3client.headObject(req)
}
presignedUrl <- S3Helper.getPresignedURL(proxyLocation, proxyLinkExpiry, defaultRegion, awsProfile)
result <- Try {
val mimeType = MimeType.fromString(meta.contentType()) match {
case Left(str) =>
logger.warn(s"Could not get MIME type for s3://${proxyLocation.bucketName}/${proxyLocation.bucketPath}: $str")
MimeType("application", "octet-stream")
case Right(t) => t
}
Ok(PlayableProxyResponse("ok", presignedUrl.toString, mimeType).asJson)
}
} yield result
result match {
case Success(result)=>result
case Failure(_:NoSuchKeyException)=>
logger.warn(s"Invalid proxy location: $proxyLocation does not point to an existing file")
NotFound(GenericErrorResponse("invalid_location",s"No proxy found for $proxyType on $fileId").asJson)
case Failure(err)=>
logger.error(s"Could not get metadata for s3://${proxyLocation.bucketName}/${proxyLocation.bucketPath}: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("error","Storage error, see logs for details").asJson)
}
case Some(Left(err))=>
InternalServerError(GenericErrorResponse("db_error", err.toString).asJson)
})
}
/**
* endpoint that performs a scan for potential proxies for the given file.
* if there is only one result, it is automatically associated.
* @param fileId ES index file ID
* @return
*/
def searchFor(fileId:String) = IsAuthenticatedAsync { _=> _=>
val resultFuture = indexer.getById(fileId).flatMap(entry=>{
implicit val s3client = s3ClientMgr.getS3Client(awsProfile, entry.region.map(Region.of))
ProxyLocator.findProxyLocation(entry)
})
resultFuture
.map(potentialProxiesResult=>{
val failures = potentialProxiesResult.collect({case Left(err)=>err})
if(failures.nonEmpty) throw new RuntimeException("Failed to get potential proxies")
potentialProxiesResult.collect({case Right(potentialProxy)=>potentialProxy})
})
.map(potentialProxies=>{
if(potentialProxies.length==1){
//if we have an unambigous map, save it right away.
indexer.getById(fileId).map(_.registerNewProxy(potentialProxies.head))
}
//otherwise, send the results back to the client
Ok(ObjectListResponse("ok","potential_proxies", potentialProxies, potentialProxies.length).asJson)
})
.recover({
case ex:Throwable=>
logger.error("Could not search for proxy")
InternalServerError(GenericErrorResponse("error", ex.toString).asJson)
})
}
/**
* endpoint to associate a given proxy with the item. ProxyId does not have to exist in the database yet;
* if not, then all potential proxies for `fileId` are found and the ids checked off against ProxyId.
* The idea is that from the frontend you can call searchFor, if this returns multiple entries you can call
* `associate` to both save that specific item to the database and link it to the provided fileId
* @param maybeFileId ES id of the file to associate with. This is an Option to make it compatible with a URL parameter; passing
* None simply results in a 400 Bad Request error.
* @param proxyId Proxy ID of the proxy to link to fileId. Get this from `searchFor`.
*/
def associate(maybeFileId:Option[String], proxyId:String) = IsAuthenticatedAsync { _=> _=>
maybeFileId match {
case None =>
Future(BadRequest(GenericErrorResponse("bad_request", "you must specify fileId={es-id}").asJson))
case Some(fileId) =>
val proxyLocationFuture = proxyLocationDAO.getProxyByProxyId(proxyId).flatMap({
case None => //no proxy with this ID in the database yet; do an S3 scan to try to find the requested id
val potentialProxyOrErrorList = indexer.getById(fileId).flatMap(entry=>{
implicit val s3client = s3ClientMgr.getS3Client(awsProfile, entry.region.map(Region.of))
ProxyLocator.findProxyLocation(entry)
})
potentialProxyOrErrorList.map(_.collect({case Right(loc)=>loc})).map(_.find(_.proxyId==proxyId))
case Some(proxyLocation) => //found it in the database
Future(Some(proxyLocation.copy(fileId = fileId)))
})
proxyLocationFuture.flatMap({
case None =>
Future(NotFound(GenericErrorResponse("not_found", "No proxy could be found either in the database or matching given file id").asJson))
case Some(proxyLocation) =>
logger.debug(s"Got proxy location $proxyLocation")
indexer
.getById(fileId)
.map(_.registerNewProxy(proxyLocation))
.map(updated => Ok(ObjectCreatedResponse("registered", "proxy", proxyLocation).asJson))
}).recover({
case ex: Throwable =>
logger.error("Could not associate proxy:", ex)
InternalServerError(GenericErrorResponse("error", ex.toString).asJson)
})
}
}
def generateThumbnail(fileId:String) = IsAuthenticatedAsync { _=> _=>
proxyGenerators.requestProxyJob(RequestType.THUMBNAIL, fileId, None).map({
case Failure(NothingFoundError(objectType, msg))=>
NotFound(GenericErrorResponse("not_found", msg.toString).asJson)
case Failure(ExternalSystemError(systemName, msg))=>
InternalServerError(GenericErrorResponse("error",s"Could not launch task: $msg").asJson)
case Failure(genericError)=>
InternalServerError(GenericErrorResponse("error", genericError.toString).asJson)
case Success(taskId)=>
Ok(responses.ObjectGetResponse("ok","task",taskId).asJson)
})
}
def generateProxy(fileId:String, typeStr:String) = IsAuthenticatedAsync { _=> _=>
try {
val pt = ProxyType.withName(typeStr.toUpperCase)
indexer.getById(fileId).flatMap(entry=>{
val canContinue = entry.mimeType.major.toLowerCase match {
case "application"=>
if(entry.mimeType.minor.toLowerCase=="octet-stream"){
//application/octet-stream could be anything, so let it go through.
Right(true)
} else {
Left(s"Can't proxy media of type ${entry.mimeType.toString}")
}
case "binary"=>
if(entry.mimeType.minor.toLowerCase=="octet-stream"){
//application/octet-stream could be anything, so let it go through.
Right(true)
} else {
Left(s"Can't proxy media of type ${entry.mimeType.toString}")
}
case "video"=> //video can proxy to anything
Right(true)
case "audio"=>
if(pt==ProxyType.VIDEO){
Left("Can't make a video proxy of an audio item")
} else {
Right(true)
}
case "image"=>
if(pt==ProxyType.AUDIO || pt==ProxyType.VIDEO){
Left("Can't make audio or video proxy of an image item")
} else {
Right(true)
}
case _=>
Left(s"Can't proxy media of type ${entry.mimeType.toString}")
}
canContinue match {
case Right(_)=>
val requestType = pt match {
case ProxyType.THUMBNAIL=>RequestType.THUMBNAIL
case _=>RequestType.PROXY
}
proxyGenerators.requestProxyJob(requestType,entry,Some(pt)).map({
case Success(jobId)=>
Ok(TranscodeStartedResponse("transcode_started", jobId, None).asJson)
case Failure(err)=>
InternalServerError(GenericErrorResponse("not_started", err.toString).asJson)
})
case Left(err)=>
Future(BadRequest(GenericErrorResponse("bad_request",err).asJson))
}
}).recoverWith({
case timeout:AskTimeoutException=>
logger.warn("Ask timed out: ", timeout)
Future(Ok(GenericErrorResponse("warning", "proxy request timed out server-side, may not have started").asJson))
case ex:Throwable=>
logger.error("Could not trigger proxy: ", ex)
Future(InternalServerError(GenericErrorResponse("error", ex.toString).asJson))
})
} catch {
case ex:Throwable=>
logger.error("Could not request proxy: ", ex)
Future(BadRequest(GenericErrorResponse("bad_request", ex.toString).asJson))
}
}
def relinkAllProxies = IsAuthenticatedAsync { _=> _=>
val jobId = UUID.randomUUID().toString
val jobDesc = JobModel(jobId,"RelinkProxies",None,None,JobStatus.ST_PENDING,None,"global",None,SourceType.SRC_GLOBAL,None)
jobModelDAO.putJob(jobDesc).map(_=> {
proxiesRelinker ! ProxiesRelinker.RelinkAllRequest(jobId)
Ok(responses.ObjectCreatedResponse("ok", "job", jobId).asJson)
}).recover({
case dberr:Throwable=>
logger.error(s"Could not create job entry: ${dberr.getMessage}", dberr)
InternalServerError(GenericErrorResponse("db_error",dberr.toString).asJson)
})
}
def relinkProxiesForTarget(scanTargetName:String) = IsAuthenticatedAsync { _=> _=>
val jobId = UUID.randomUUID().toString
val jobDesc = JobModel(jobId, "RelinkProxies", Some(ZonedDateTime.now()), None, JobStatus.ST_RUNNING, None, scanTargetName, None, SourceType.SRC_SCANTARGET, None)
jobModelDAO.putJob(jobDesc).flatMap(_=>{
proxiesRelinker ! ProxiesRelinker.RelinkScanTargetRequest(jobId, scanTargetName)
scanTargetDAO.withScanTarget(scanTargetName) { target =>
val updatedScanTarget = target.withAnotherPendingJob(jobDesc.jobId)
scanTargetDAO.put(updatedScanTarget)
} map {
case None =>
Ok(responses.ObjectCreatedResponse("ok", "job", jobId).asJson)
case Some(Left(err)) =>
logger.error(s"Could not updated scan target: $err")
InternalServerError(GenericErrorResponse("db_error", err.toString).asJson)
case Some(Right(rec)) =>
Ok(responses.ObjectCreatedResponse("ok", "job", jobId).asJson)
}
}).recover({
case dberr:Throwable=>
logger.error(s"Could not initiate relink for target $scanTargetName: ${dberr.getMessage}", dberr)
InternalServerError(GenericErrorResponse("db_error", dberr.getMessage).asJson)
})
}
def checkProxyExists(bucket:String, path:String, region:Region):Try[Option[HeadObjectResponse]] = {
val s3conn = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), Some(region))
try {
val result = s3conn.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build())
Success(Some(result))
} catch {
case ex:AmazonS3Exception=>
if(ex.getStatusCode==404){ //object does not exist
Success(None)
} else {
Failure(ex)
}
case ex:Throwable=>
Failure(ex)
}
}
/**
* this represents an endpoint that allows the manual association of a proxy uri to an item
* you trigger it with a ManualProxySet request passed as Json
* if the given item already has a proxy then a 409 Conflict is returned
* if nothing can be read by the server at the proxy address given then 404 Not Found is returned
* if the proxy is set then 200 OK is returned
* otherwise a 400 for invalid data or a 500 if there is a server-side error
* @return
*/
def manualSet = IsAdminAsync(circe.json(2048)) { _=> request=>
request.body.as[ManualProxySet].fold(
failure =>
Future(BadRequest(GenericErrorResponse("bad_request", failure.toString).asJson)),
proxySetRequest => {
proxyLocationDAO.getProxy(proxySetRequest.entryId, proxySetRequest.proxyType).flatMap({
case Some(existingProxy) =>
Future(Conflict(responses.ObjectCreatedResponse("proxy_exists", "proxy_id", existingProxy.proxyId).asJson))
case None =>
checkProxyExists(proxySetRequest.proxyBucket, proxySetRequest.proxyPath, Region.of(proxySetRequest.region)) match {
case Success(None) => //proxy does not exist
Future(NotFound(GenericErrorResponse("no_proxy", "Requested proxy file does not exist").asJson))
case Failure(err) =>
logger.error(s"Could not access proxy at s3://${proxySetRequest.proxyBucket}/${proxySetRequest.proxyType}: ", err)
Future(InternalServerError(GenericErrorResponse("error", err.toString).asJson))
case Success(Some(proxyMeta)) => //proxy exists
val newLoc = ProxyLocation.fromS3(proxySetRequest.proxyBucket, proxySetRequest.proxyPath, proxySetRequest.entryId, proxyMeta, Some(proxySetRequest.region), proxySetRequest.proxyType)
proxyLocationDAO.saveProxy(newLoc).map(_=> {
Ok(ObjectCreatedResponse("ok", "proxy", newLoc.proxyId).asJson)
}).recover({
case err:Throwable =>
InternalServerError(GenericErrorResponse("db_error", err.toString).asJson)
})
}
})
}
)
}
def analyseMetadata(entryId:String) = IsAdminAsync { _=> _=>
indexer.getById(entryId).flatMap(entry => {
proxyGenerators
.requestMetadataAnalyse(entry, config.getOptional[String]("externalData.awsRegion").getOrElse("eu-west-1"))
.map({
case Left(err) =>
logger.error(s"Could not request analyse: $err")
InternalServerError(GenericErrorResponse("error", err).asJson)
case Right(jobId) =>
Ok(ObjectCreatedResponse("ok", "job", jobId).asJson)
})
}).recover({
case err: Throwable =>
logger.error("Could not request analyse: ", err)
InternalServerError(GenericErrorResponse("error", err.toString).asJson)
})
}
def deleteProxyFile(proxyLocation:ProxyLocation) = Try {
val s3conn = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"), proxyLocation.region.map(Region.of))
s3conn.deleteObject(DeleteObjectRequest.builder().bucket(proxyLocation.bucketName).key(proxyLocation.bucketPath).build())
}
/**
* manually delete the given proxy.
* @param fileId file ID of the main media
* @param inputProxyType type of proxy to delete.
* @return
*/
def manualDelete(fileId:String, inputProxyType:String) = IsAdminAsync { _=> request=>
try {
val proxyType = ProxyType.withName(inputProxyType)
proxyLocationDAO.getProxy(fileId,proxyType).flatMap({
case None=>Future(NotFound(GenericErrorResponse("not_found","No proxy found").asJson))
case Some(loc)=>
deleteProxyFile(loc) match {
case Success(_)=>
proxyLocationDAO.deleteProxyRecord(fileId, proxyType).map(result=>
Ok(ObjectCreatedResponse("deleted","proxy",s"${fileId}:${inputProxyType}").asJson)
).recoverWith({
case err:Throwable=>
logger.error("Could not delete proxy record in database", err)
Future(InternalServerError(GenericErrorResponse("db_error",err.toString).asJson))
})
case Failure(err)=>
logger.error("Could not delete proxy file: ", err)
Future(InternalServerError(GenericErrorResponse("error", err.toString).asJson))
}
})
} catch {
case ex:Throwable=>
Future(BadRequest(GenericErrorResponse("error",s"Did not recognise proxy type $inputProxyType").asJson))
}
}
}