app/controllers/ImportController.scala (203 lines of code) (raw):
package controllers
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.Materializer
import auth.{BearerTokenAuth, Security}
import io.circe.generic.auto._
import com.sksamuel.elastic4s.http.ElasticClient
import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer, ProxyLocation, ProxyLocationDAO, ProxyTypeEncoder}
import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager}
import com.theguardian.multimedia.archivehunter.common.cmn_helpers.PathCacheExtractor
import com.theguardian.multimedia.archivehunter.common.cmn_models.{PathCacheEntry, PathCacheIndexer, ScanTarget, ScanTargetDAO}
import helpers.{IndexerFactory, ProxyLocator}
import play.api.Configuration
import play.api.libs.circe.Circe
import play.api.mvc.{AbstractController, ControllerComponents, Result}
import requests.{ProxyImportRequest, SpecificImportRequest}
import io.circe.generic.auto._
import io.circe.syntax._
import org.scanamo.DynamoReadError
import org.slf4j.LoggerFactory
import play.api.cache.SyncCacheApi
import responses.{GenericErrorResponse, ObjectCreatedResponse}
import services.IngestProxyQueue
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import javax.inject.{Inject, Named, Singleton}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Try
@Singleton
class ImportController @Inject()(override val config:Configuration,
override val controllerComponents:ControllerComponents,
scanTargetDAO:ScanTargetDAO,
proxyLocationDAO:ProxyLocationDAO,
s3ClientMgr: S3ClientManager,
esClientMgr: ESClientManager,
override val bearerTokenAuth: BearerTokenAuth,
override val cache:SyncCacheApi,
ddbClientMgr:DynamoClientManager,
indexerFactory: IndexerFactory,
@Named("ingestProxyQueue") ingestProxyQueue:ActorRef)
(implicit actorSystem:ActorSystem, mat:Materializer)
extends AbstractController(controllerComponents) with Security with Circe with ProxyTypeEncoder {
import com.theguardian.multimedia.archivehunter.common.cmn_helpers.S3ClientExtensions._
override protected val logger = LoggerFactory.getLogger(getClass)
private val awsProfile = config.getOptional[String]("externalData.awsProfile")
private implicit val esClient = esClientMgr.getClient()
private implicit val pathCacheIndexer = new PathCacheIndexer(config.getOptional[String]("externalData.pathCacheIndex").getOrElse("pathcache"), esClient)
private implicit val ddbClient = ddbClientMgr.getNewAsyncDynamoClient(awsProfile)
private implicit val indexer = indexerFactory.get()
def writePathCacheEntries(newCacheEntries:Seq[PathCacheEntry])
(implicit pathCacheIndexer:PathCacheIndexer, elasticClient:ElasticClient) = {
import com.sksamuel.elastic4s.http.ElasticDsl._
import io.circe.generic.auto._
import com.sksamuel.elastic4s.circe._
Future.sequence(
newCacheEntries.map(entry=>elasticClient.execute(
update(entry.collection + entry.key) in s"${pathCacheIndexer.indexName}/pathcache" docAsUpsert entry
))
).map(responses=>{
val failures = responses.filter(_.isError)
if(failures.nonEmpty) {
logger.error(s"${failures.length} path cache entries failed: ")
failures.foreach(err=>logger.error(err.error.reason))
}
println(s"${failures.length} / ${newCacheEntries.length} path cache entries failed")
})
}
def addToPathCache(importRequest:SpecificImportRequest) = {
//build a list of entries to add to the path cache
val pathParts = importRequest.itemPath.split("/").init //the last element is the filename, which we are not interested in.
val newCacheEntries = if(pathParts.isEmpty) {
Seq()
} else {
PathCacheExtractor.recursiveGenerateEntries(pathParts.init, pathParts.last, pathParts.length, importRequest.collectionName)
}
logger.info(s"going to update ${newCacheEntries.length} path cache entries")
writePathCacheEntries(newCacheEntries)
}
def importFromPath = IsAuthenticatedAsync(circe.json(2048)) { uid=> request=>
request.body.as[SpecificImportRequest] match {
case Left(err)=>
Future(BadRequest(GenericErrorResponse("bad_request",err.toString()).asJson))
case Right(importRequest)=>
scanTargetDAO.targetForBucket(importRequest.collectionName).flatMap({
case None=>
Future(BadRequest(GenericErrorResponse("not_found", "The given collection does not exist").asJson))
case Some(Left(err))=>
logger.error(s"Could not look up scan target for bucket: $err")
Future(InternalServerError(GenericErrorResponse("db_error","Could not look up scan target").asJson))
case Some(Right(scanTarget))=>
if(scanTarget.enabled) {
implicit val s3client:S3Client = s3ClientMgr.getClient(awsProfile)
if(s3client.doesObjectExist(scanTarget.bucketName, importRequest.itemPath).get) {
val entry = ArchiveEntry.fromS3Sync(scanTarget.bucketName, importRequest.itemPath, None, scanTarget.region) //importing from path => take latest version
indexer.indexSingleItem(entry).flatMap({
case Left(err)=>
logger.error(s"Could not index new item $entry: $err")
Future(InternalServerError(GenericErrorResponse("index_error","Could not index new item, see server logs").asJson))
case Right(newId)=>
logger.info(s"Registered new item with ID $newId, adding to path cache")
ingestProxyQueue ! IngestProxyQueue.CheckRegisteredProxy
ingestProxyQueue ! IngestProxyQueue.CheckRegisteredThumb
ingestProxyQueue ! IngestProxyQueue.StartAnalyse
addToPathCache(importRequest).map(_=>{
Ok(ObjectCreatedResponse("ok","item",newId).asJson)
}).recover({
case err:Throwable=>
logger.error(s"Could not add to path cache, but did create item: $err")
Ok(ObjectCreatedResponse("ok","item",newId).asJson)
})
})
} else {
Future(NotFound(GenericErrorResponse("not_found","The given file does not exist").asJson))
}
} else {
Future(BadRequest(GenericErrorResponse("disabled","This scan target is disabled").asJson))
}
}).recover({
case err:Throwable=>
logger.error(s"Specific import request crashed: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("error","The import process failed, please see server logs for details").asJson)
})
}
}
def proxyBucketForMaybeScanTarget(tgt:Option[Either[DynamoReadError, ScanTarget]]):Future[String] = tgt match {
case None=>
Future.failed(new RuntimeException("No scan target existed for the item's bucket!"))
case Some(Left(err))=>
Future.failed(new RuntimeException(err.toString))
case Some(Right(actualTarget))=>
Future(actualTarget.proxyBucket)
}
/**
* Wraps the "saveProxy" method to make it a bit nicer for composition
* @param newRecord record to save
* @return a Future which completes with the saved record on success and fails on error
*/
private def wrapSaveProxy(newRecord:ProxyLocation) = {
proxyLocationDAO.saveProxy(newRecord).map(_=>newRecord)
}
/**
* Ensures that the requested proxy exists and if so 'connects' it as a proxy to the given item
* @param importRequest ProxyImportRequest containing information about the proxy to attach
* @param item ArchiveEntry representing the item to attach it to
* @param correctProxyBucket validated proxy bucket that it should come from
* @return a Future, containing a Play result
*/
private def performProxyImport(importRequest:ProxyImportRequest, item:ArchiveEntry, correctProxyBucket:String) = {
implicit val s3client:S3Client = s3ClientMgr.getS3Client(region=item.region.map(Region.of))
Future
.fromTry(s3client.doesObjectExist(correctProxyBucket, importRequest.proxyPath))
.flatMap({
case false => Future(BadRequest(GenericErrorResponse("error", "that proxy does not exist").asJson))
case true =>
ProxyLocation.fromS3(
correctProxyBucket,
importRequest.proxyPath,
item.bucket,
item.path,
Some(importRequest.proxyType),
Region.of(item.region.getOrElse(config.get[String]("externalData.awsRegion")))
).flatMap({
case Left(err)=>
Future(InternalServerError(GenericErrorResponse("error",s"Could not get proxy location from s3 $err").asJson))
case Right(newRecord)=>
Future.sequence(Seq(
wrapSaveProxy(newRecord),
ProxyLocator.setProxiedWithRetry(item.id)
)).map(_=>Ok(GenericErrorResponse("ok","proxy set").asJson))
})
})
}
/**
* Ensures that the requested proxy bucket does not conflict with the configured one and that there is not an existing
* proxy in place (or we are allowed to over-write). If both these conditions are met, calls performProxyImport to do the
* actual import
* @param importRequest ProxyImportRequest containing information about the proxy to attach
* @param item ArchiveEntry representing the item to attach it to
* @param correctProxyBucket validated proxy bucket that it should come from
* @param existingProxies looked-up list of existing proxies for the given item
* @return a Future, containing a Play response
*/
def checkAndPerformProxyImport(importRequest:ProxyImportRequest, item:ArchiveEntry, correctProxyBucket:String, existingProxies:List[ProxyLocation]):Future[Result] = {
if(importRequest.proxyBucket.isDefined && importRequest.proxyBucket.get != correctProxyBucket) {
Future(Conflict(GenericErrorResponse("conflict","incorrect proxy bucket for item").asJson))
} else {
if(existingProxies.exists(_.proxyType == importRequest.proxyType) && !importRequest.overwrite.getOrElse(false)) {
Future(Conflict(GenericErrorResponse("conflict",s"a proxy of type ${importRequest.proxyType} already exists").asJson))
} else {
performProxyImport(importRequest, item, correctProxyBucket)
}
}
}
/**
* simplifies the Dynamo read response, by failing the Future if any errors are present
* @param input original response from DynamoDB
* @return a Future containing a list of ProxyLocation. If any read fails, the whole Future fails.
*/
private def simplifyDynamoReturn(input:Future[List[Either[DynamoReadError, ProxyLocation]]]):Future[List[ProxyLocation]] = input.flatMap(results=>{
val failures = results.collect({case Left(err)=>err})
if(failures.nonEmpty) {
Future.failed(new RuntimeException(failures.map(_.toString).mkString(";")))
} else {
Future(results.collect({case Right(result)=>result}))
}
})
def importProxy = IsAuthenticatedAsync(circe.json(2048)) { uid=> request=>
request.body.as[ProxyImportRequest] match {
case Left(err)=>
Future(BadRequest(GenericErrorResponse("bad_request", err.toString()).asJson))
case Right(importRequest)=>
val result = for {
item <- indexer.getById(importRequest.itemId) //this version fails on a RuntimeException if there is no item present
itemsScanTarget <- scanTargetDAO.targetForBucket(item.bucket)
proxyBucket <- proxyBucketForMaybeScanTarget(itemsScanTarget)
proxyRecords <- simplifyDynamoReturn(proxyLocationDAO.getAllProxiesFor(item.id))
result <- checkAndPerformProxyImport(importRequest, item, proxyBucket, proxyRecords)
} yield result
result.recover({
case err:Throwable=>
if(err.getMessage=="Item could not be found") {
logger.error(s"Proxy import request $importRequest from $uid references non-existing item")
NotFound(GenericErrorResponse("bad_request","invalid item").asJson)
} else {
logger.error(s"Could not perform import request $importRequest: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("server_error","see logs").asJson)
}
})
}
}
}