app/controllers/ScanTargetController.scala (214 lines of code) (raw):
package controllers
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.util.UUID
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, Materializer}
import auth.{BearerTokenAuth, Security}
import org.scanamo._
import org.scanamo.syntax._
import org.scanamo.generic.auto._
import com.theguardian.multimedia.archivehunter.common.ZonedDateTimeEncoder
import javax.inject.{Inject, Named, Singleton}
import play.api.{Configuration, Logger}
import play.api.mvc._
import io.circe.generic.auto._
import io.circe.syntax._
import play.api.libs.circe.Circe
import responses.{CheckNotificationResponse, GenericErrorResponse, ObjectCreatedResponse, ObjectGetResponse, ObjectListResponse}
import com.theguardian.multimedia.archivehunter.common.clientManagers.DynamoClientManager
import com.theguardian.multimedia.archivehunter.common.ProxyTranscodeFramework.ProxyGenerators
import com.theguardian.multimedia.archivehunter.common.cmn_helpers.ZonedTimeFormat
import com.theguardian.multimedia.archivehunter.common.cmn_models._
import org.slf4j.LoggerFactory
import play.api.cache.SyncCacheApi
import services.{BucketNotificationConfigurations, BucketScanner, BulkThumbnailer, LegacyProxiesScanner}
import software.amazon.awssdk.regions.Region
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
@Singleton
class ScanTargetController @Inject() (@Named("bucketScannerActor") bucketScanner:ActorRef,
@Named("legacyProxiesScannerActor") proxyScanner:ActorRef,
@Named("bulkThumbnailerActor") bulkThumbnailer: ActorRef,
override val config:Configuration,
override val controllerComponents:ControllerComponents,
override val bearerTokenAuth:BearerTokenAuth,
override val cache:SyncCacheApi,
ddbClientMgr:DynamoClientManager,
proxyGenerators:ProxyGenerators,
scanTargetDAO:ScanTargetDAO,
jobModelDAO:JobModelDAO,
bucketNotifications:BucketNotificationConfigurations)
(implicit system:ActorSystem, mat:Materializer)
extends AbstractController(controllerComponents) with Security with Circe with ZonedDateTimeEncoder with ZonedTimeFormat with JobModelEncoder {
override protected val logger=LoggerFactory.getLogger(getClass)
val table = Table[ScanTarget](config.get[String]("externalData.scanTargets"))
private val profileName = config.getOptional[String]("externalData.awsProfile")
private val scanamo = Scanamo(ddbClientMgr.getNewDynamoClient(profileName))
private val scanamoAlpakka = ScanamoAlpakka(ddbClientMgr.getNewAsyncDynamoClient(profileName))
def newTarget = IsAdmin(circe.json[ScanTarget]) { _=> request=>
try {
scanamo.exec(table.put(request.body))
Ok(ObjectCreatedResponse[String]("created","scan_target",request.body.bucketName).asJson)
} catch {
case err:Throwable=>
logger.error(s"Can't create scan target ${request.body.bucketName}: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("error",err.getMessage).asJson)
}
}
def removeTarget(targetName:String) = IsAdmin { _=> request=>
try {
scanamo.exec(table.delete("bucketName" === targetName))
Ok(ObjectCreatedResponse[String]("deleted", "scan_target", targetName).asJson)
} catch {
case err:Throwable=>
logger.error(s"Can't create scan target $targetName: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("error",err.getMessage).asJson)
}
}
def get(targetName:String) = IsAdmin { _=> request=>
scanamo.exec(table.get("bucketName"===targetName)).map({
case Left(err) =>
InternalServerError(GenericErrorResponse("database_error", err.toString).asJson)
case Right(result) =>
Ok(ObjectGetResponse[ScanTarget]("ok", "scan_target", result).asJson)
}).getOrElse(NotFound(ObjectCreatedResponse[String]("not_found", "scan_target", targetName).asJson))
}
def listScanTargets = IsAdminAsync { _=> request=>
scanTargetDAO.allScanTargets()
.map({ result =>
val errors = result.collect({
case Left(readError) => readError
})
if (errors.isEmpty) {
val success = result.collect({
case Right(scanTarget) => scanTarget
})
Ok(ObjectListResponse[List[ScanTarget]]("ok", "scan_target", success, success.length).asJson)
} else {
errors.foreach(err => logger.error(err.toString))
InternalServerError(GenericErrorResponse("error", errors.map(_.toString).mkString(",")).asJson)
}
})
}
private def withLookup(targetName:String)(block: ScanTarget=>Result) = scanamo
.exec(table.get("bucketName"===targetName ))
.map({
case Left(error)=>
InternalServerError(GenericErrorResponse("error", error.toString).asJson)
case Right(tgt)=>
block(tgt)
})
.getOrElse(NotFound(ObjectCreatedResponse[String]("not_found","scan_target",targetName).asJson))
private def withLookupAsync(targetName:String)(block: ScanTarget=>Future[Result]) = scanamo
.exec(table.get("bucketName"===targetName ))
.map({
case Left(error)=>
Future(InternalServerError(GenericErrorResponse("error", error.toString).asJson))
case Right(tgt)=>
block(tgt)
})
.getOrElse(Future(NotFound(ObjectCreatedResponse[String]("not_found","scan_target",targetName).asJson)))
/**
* tries to create and save a scan job and then calls the block with the saved result.
* if the db operation fails then an InternalServerError is returned
* @param targetName target name being scanned
* @param jobType job type name to use
* @param block a block receiving the JobModel and returning a Future of Result
* @return either the Result of the block as a future or an InternalServerError describing the database failure
*/
private def withNewScanJob(tgt:ScanTarget, jobType:String)(block: JobModel=>Result) = {
val jobUuid = UUID.randomUUID()
val job = JobModel(jobUuid.toString,jobType,Some(ZonedDateTime.now()),None,JobStatus.ST_RUNNING,None,tgt.bucketName,None,SourceType.SRC_SCANTARGET,lastUpdatedTS=None)
jobModelDAO
.putJob(job)
.flatMap(_=>{
val updatedTarget = tgt.withAnotherPendingJob(job.jobId)
Future.fromTry(scanTargetDAO.put(updatedTarget))
})
.map(_=>block(job))
.recover({
case err:Throwable=>
logger.error(s"Could not create and save new scan job: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("db_error","Could not create and save scan job, see logs").asJson)
})
}
def manualTrigger(targetName:String) = IsAdminAsync { _=> request=>
withLookupAsync(targetName) { tgt =>
withNewScanJob(tgt, "ManualScan") { job =>
bucketScanner ! BucketScanner.PerformDeletionScan(tgt, thenScanForNew = true, Some(job))
Ok(GenericErrorResponse("ok", "scan started").asJson)
}
}
}
def manualTriggerAdditionScan(targetName:String) = IsAdminAsync { _=> request=>
withLookupAsync(targetName) { tgt =>
withNewScanJob(tgt,"AdditionScan") { job =>
bucketScanner ! BucketScanner.PerformTargetScan(tgt, Some(job))
Ok(GenericErrorResponse("ok", "scan started").asJson)
}
}
}
def manualTriggerDeletionScan(targetName:String) = IsAdminAsync { _=> request=>
withLookupAsync(targetName) { tgt =>
withNewScanJob(tgt, "DeletionScan") { job =>
bucketScanner ! BucketScanner.PerformDeletionScan(tgt, maybeJob=Some(job))
Ok(GenericErrorResponse("ok", "scan started").asJson)
}
}
}
def scanForLegacyProxies(targetName:String) = IsAdmin { _=> request=>
withLookup(targetName) { tgt =>
proxyScanner ! LegacyProxiesScanner.ScanBucket(tgt)
Ok(GenericErrorResponse("ok", "scan started").asJson)
}
}
def genProxies(targetName:String) = IsAdmin { _=> request=>
withLookup(targetName) { tgt =>
bulkThumbnailer ! new BulkThumbnailer.DoThumbnails(tgt)
Ok(GenericErrorResponse("ok", "proxy run started").asJson)
}
}
/**
* ask the proxy framework to validate this configuration.
* @param targetName
* @return
*/
def initiateCheckJob(targetName:String) = IsAdminAsync { _=> request=>
withLookupAsync(targetName){ tgt =>
proxyGenerators.requestCheckJob(tgt.bucketName, tgt.proxyBucket, tgt.region).map({
case Left(err) =>
InternalServerError(GenericErrorResponse("error", err).asJson)
case Right(jobId) =>
val updatedJobIds = tgt.pendingJobIds match {
case Some(existingSeq) => existingSeq ++ Seq(jobId)
case None => Seq(jobId.toString)
}
val updatedTarget = tgt.copy(pendingJobIds = Some(updatedJobIds))
scanTargetDAO.put(updatedTarget) match {
case Success(record) =>
Ok(ObjectCreatedResponse("ok", "jobId", jobId).asJson)
case Failure(err) =>
InternalServerError(GenericErrorResponse("error", err.toString).asJson)
}
})
}
}
def createPipelines(targetName:String, force:Boolean) = IsAdminAsync { _=> _=>
withLookupAsync(targetName){ tgt =>
proxyGenerators.requestPipelineCreate(tgt.bucketName, tgt.proxyBucket, tgt.region, force).map({
case Left(err)=>
InternalServerError(GenericErrorResponse("error",err).asJson)
case Right(jobId)=>
val updatedScanTarget = tgt.withAnotherPendingJob(jobId)
scanTargetDAO.put(updatedScanTarget) match {
case Success(scanTarget)=>Ok(ObjectCreatedResponse("ok","job",jobId).asJson)
case Failure(err)=>InternalServerError(GenericErrorResponse("error", err.toString).asJson)
}
})
}
}
def fixNotificationConfiguration(targetName:String) = notificationConfiguration(targetName, true)
def checkNotificationConfiguration(targetName:String) = notificationConfiguration(targetName, false)
def notificationConfiguration(targetName:String, shouldUpdate:Boolean) = IsAdmin { _=> _=>
withLookup(targetName) { tgt=>
val rgn = Region.of(tgt.region)
bucketNotifications.verifyNotificationSetup(tgt.bucketName, Some(rgn), shouldUpdate) match {
case Success((updatesRequired, didUpdate))=>
Ok(CheckNotificationResponse("ok",updatesRequired,didUpdate).asJson)
case Failure(err)=>
logger.error(s"Could not check notification configuration on target $targetName (${tgt.bucketName} in ${tgt.region}): ${err.getClass.getCanonicalName} ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("error", err.toString).asJson)
}
}
}
}