app/controllers/LightboxController.scala (449 lines of code) (raw):

package controllers import java.time.ZonedDateTime import java.util.UUID import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph} import akka.stream.{ActorMaterializer, ClosedShape, Materializer} import auth.{BearerTokenAuth, Security, UserRequest} import com.google.inject.Injector import org.scanamo.DynamoReadError import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer, StorageClass, ZonedDateTimeEncoder} import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager} import com.theguardian.multimedia.archivehunter.common.cmn_models._ import helpers.LightboxStreamComponents.{BulkRestoreStatsSink, ExtractArchiveEntry, InitiateRestoreSink, LightboxDynamoSource, LookupArchiveEntryFromLBEntryFlow, LookupLightboxEntryFlow, UpdateLightboxIndexInfoSink} import helpers.{LightboxHelper, UserAvatarHelper} import javax.inject.{Inject, Named, Singleton} import play.api.{Configuration, Logger} import play.api.libs.circe.Circe import play.api.mvc.{AbstractController, ControllerComponents, Request, Result} import responses._ import io.circe.syntax._ import io.circe.generic.auto._ import models.{ServerTokenDAO, ServerTokenEntry, UserProfile, UserProfileDAO} import requests.SearchRequest import services.GlacierRestoreActor import services.GlacierRestoreActor.GRMsg import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} import helpers.S3Helper.getPresignedURL import play.api.cache.SyncCacheApi import auth.ClaimsSetExtensions._ import org.slf4j.LoggerFactory import software.amazon.awssdk.regions.Region @Singleton class LightboxController @Inject() (override val config:Configuration, override val controllerComponents:ControllerComponents, override val bearerTokenAuth: BearerTokenAuth, override val cache:SyncCacheApi, esClientMgr:ESClientManager, @Named("glacierRestoreActor") glacierRestoreActor:ActorRef, lightboxBulkEntryDAO: LightboxBulkEntryDAO, serverTokenDAO: ServerTokenDAO, dynamoClientManager:DynamoClientManager, userAvatarHelper:UserAvatarHelper) (implicit val system:ActorSystem, mat:Materializer, injector:Injector, s3ClientMgr:S3ClientManager, lightboxEntryDAO: LightboxEntryDAO, userProfileDAO: UserProfileDAO) extends AbstractController(controllerComponents) with Security with Circe with ZonedDateTimeEncoder with RestoreStatusEncoder { override protected val logger=LoggerFactory.getLogger(getClass) private implicit val indexer = new Indexer(config.get[String]("externalData.indexName")) private val awsProfile = config.getOptional[String]("externalData.awsProfile") private implicit val esClient = esClientMgr.getClient() private implicit val ec:ExecutionContext = controllerComponents.executionContext private val indexName = config.get[String]("externalData.indexName") implicit val timeout:akka.util.Timeout = 30 seconds val tokenShortDuration = config.getOptional[Int]("serverToken.shortLivedDuration").getOrElse(10) //default value is 2 hours val defaultLinkExpiry = 1800 //links expire after 30 minutes val defaultRegion = Region.of(config.get[String]("externalData.awsRegion")) def withTargetUserProfile[T](request:Request[T], user:String)(block: (UserProfile=>Future[Result])) = targetUserProfile(request, user).flatMap({ case None => Future(BadRequest(GenericErrorResponse("session_error", "no session present").asJson)) case Some(Left(err)) => logger.error(s"Session is corrupted: ${err.toString}") Future(InternalServerError(GenericErrorResponse("session_error", "session is corrupted, log out and log in again").asJson)) case Some(Right(profile)) => block(profile) }) def removeFromLightbox(user:String, fileId:String) = IsAuthenticatedAsync { claims=> request=> withTargetUserProfile(request, user) { profile=> val indexUpdateFuture = indexer.getById(fileId).flatMap(indexEntry => { val updatedEntry = indexEntry.copy(lightboxEntries = indexEntry.lightboxEntries.filter(_.owner!=profile.userEmail)) indexer.indexSingleItem(updatedEntry, Some(updatedEntry.id)) }) val lbUpdateFuture = lightboxEntryDAO.delete(profile.userEmail, fileId) .map(result=>Success(result.toString)) .recoverWith({ case err:Throwable=>Future(Failure(err)) }) Future.sequence(Seq(indexUpdateFuture,lbUpdateFuture)).map(results=>{ val errors = results.collect({case Failure(err)=>err}) if(errors.nonEmpty){ errors.foreach(err=>logger.error("Could not remove from lightbox", err)) InternalServerError(ObjectListResponse("error","errors",errors.map(_.toString), errors.length).asJson) } else { Ok(GenericErrorResponse("ok","removed").asJson) } }) } } private def saveAndStartRestore(maybeBulkEntry:Either[DynamoReadError, LightboxBulkEntry], searchReq:SearchRequest, userProfile:UserProfile, user:String) = maybeBulkEntry match { case Left(err)=> logger.error(s"Could not get bulk restore entries: ${err.toString}") Future(InternalServerError(GenericErrorResponse("error", err.toString).asJson)) case Right(entry)=> logger.info(s"Got bulk restore entry: $entry") lightboxBulkEntryDAO.put(entry) .flatMap(savedEntry=>{ LightboxHelper .addToBulkFromSearch(indexName, userProfile, userAvatarHelper.getAvatarLocationString(userProfile.userEmail), searchReq, savedEntry) .flatMap(updatedBulkEntry => { lightboxBulkEntryDAO.put(updatedBulkEntry).map(_=>{ Ok(ObjectCreatedResponse("ok", "bulkLightboxEntry", updatedBulkEntry.id).asJson) }) }).recover({ case err: Throwable => logger.error("Could not save lightbox entry: ", err) InternalServerError(GenericErrorResponse("error", err.toString).asJson) }) }).recover({ case err:Throwable=> logger.error(s"Could not save bulk restore entry: $err") InternalServerError(GenericErrorResponse("db_error",err.toString).asJson) }) } private def getOrCreateBulkEntry(searchReq:SearchRequest, userProfile:UserProfile, user:String) = { //either pick up an existing bulk entry or create a new one val bulkDesc = s"${searchReq.collection.get}:${searchReq.path.getOrElse("none")}" lightboxBulkEntryDAO.entryForDescAndUser(userProfile.userEmail, bulkDesc) .map(_.map({ case Some(entry)=>entry case None=>LightboxBulkEntry.create(userProfile.userEmail, bulkDesc) })) } def addFromSearch(user:String) = IsAuthenticatedAsync(circe.json(2048)) { claims=> request=> request.body.as[SearchRequest].fold( err=> Future(BadRequest(GenericErrorResponse("bad_request", err.toString).asJson)), searchReq=> withTargetUserProfile(request, user) { userProfile=> LightboxHelper.testBulkAddSize(indexName ,userProfile, searchReq).flatMap({ case Left(resp:QuotaExceededResponse)=> Future(new Status(413)(resp.asJson)) case Right(restoreSize)=> logger.info(s"Proceeding with bulk restore of size $restoreSize") for { maybeLightboxBulkEntry <- getOrCreateBulkEntry(searchReq, userProfile, user) response <- saveAndStartRestore(maybeLightboxBulkEntry, searchReq, userProfile, user) } yield response }).recover({ case err:Throwable=> logger.error("Could not test bulk add size: ", err) InternalServerError(GenericErrorResponse("error",err.toString).asJson) }) } ) } def addToLightbox(user:String, fileId:String) = IsAuthenticatedAsync { claims=> request=> withTargetUserProfile(request, user) { userProfile=> indexer.getById(fileId).flatMap(indexEntry => Future.sequence(Seq( LightboxHelper.saveLightboxEntry(userProfile, indexEntry, None), LightboxHelper.updateIndexLightboxed(userProfile, userAvatarHelper.getAvatarLocationString(userProfile.userEmail), indexEntry, None) )).map(results=>{ val errors = results.collect({case Failure(err)=>err}) if(errors.nonEmpty){ errors.foreach(err=>logger.error("Could not create lightbox entry", err)) InternalServerError(ObjectListResponse("error","errors",errors.map(_.toString), errors.length).asJson) } else { val lbEntry = results.head.asInstanceOf[Try[LightboxEntry]].get if(indexEntry.storageClass==StorageClass.GLACIER){ glacierRestoreActor ! GlacierRestoreActor.InitiateRestore(indexEntry, lbEntry, None) //use default expiration } Ok(GenericErrorResponse("ok","saved").asJson) } }) ) } } def lightboxDetails(user:String) = IsAuthenticatedAsync { claims=> request=> withTargetUserProfile(request, user) { userProfile=> lightboxEntryDAO.allForUser(userProfile.userEmail).map(results => { val errors = results.collect({ case Left(err) => err }) if (errors.nonEmpty) { errors.foreach(err => logger.error(s"Could not retrieve lightbox details: ${err.toString}")) InternalServerError(ObjectListResponse("db_error", "error", errors.map(_.toString), errors.length).asJson) } else { //it's easier for the frontend to consume this if we convert to a mapping here val finalResult = results.collect({ case Right(entry) => entry }).map(entry => Tuple2(entry.fileId, entry)).toMap Ok(ObjectListResponse("ok", "lightboxEntry", finalResult, results.length).asJson) } }) } } /** * returns a presigned URL to download the requested media file. You must be logged in for this to work (obviously!) * @param fileId * @return */ def getDownloadLink(fileId:String) = IsAuthenticatedAsync { claims=> request=> userProfileFromSession(request.session) match { case None=>Future(BadRequest(GenericErrorResponse("session_error","no session present").asJson)) case Some(Left(err))=> logger.error(s"Session is corrupted: ${err.toString}") Future(InternalServerError(GenericErrorResponse("session_error","session is corrupted, log out and log in again").asJson)) case Some(Right(userProfile))=> indexer.getById(fileId).map(archiveEntry=>{ if(userProfile.allCollectionsVisible || userProfile.visibleCollections.contains(archiveEntry.bucket)){ getPresignedURL(archiveEntry, defaultLinkExpiry, defaultRegion, awsProfile) match { case Success(url)=> Ok(ObjectGetResponse("ok","link",url.toString).asJson) case Failure(ex)=> logger.error("Could not generate presigned s3 url: ", ex) InternalServerError(GenericErrorResponse("error",ex.toString).asJson) } } else { Forbidden(GenericErrorResponse("forbidden", "You don't have access to the right catalogue to do this").asJson) } }) } } def checkRestoreStatus(user:String, fileId:String) = IsAuthenticatedAsync { claims=> request=> implicit val timeout:akka.util.Timeout = 60 seconds withTargetUserProfile(request, user) { userProfile=> lightboxEntryDAO.get(userProfile.userEmail, fileId).flatMap({ case None=> Future(NotFound(GenericErrorResponse("not_found","This item is not in your lightbox").asJson)) case Some(Left(err))=> Future(InternalServerError(GenericErrorResponse("db_error", err.toString).asJson)) case Some(Right(lbEntry))=> val response = (glacierRestoreActor ? GlacierRestoreActor.CheckRestoreStatus(lbEntry)).mapTo[GlacierRestoreActor.GRMsg] response.map({ case GlacierRestoreActor.ItemLost(_)=> NotFound(GenericErrorResponse("not_found","Item must have gone away").asJson) case GlacierRestoreActor.NotInArchive(entry)=> Ok(RestoreStatusResponse("ok",entry.id, RestoreStatus.RS_UNNEEDED, None, None).asJson) case GlacierRestoreActor.RestoreCompleted(entry, expiry)=> Ok(RestoreStatusResponse("ok", entry.id, RestoreStatus.RS_SUCCESS, Some(expiry), None).asJson) case GlacierRestoreActor.RestoreInProgress(entry)=> Ok(RestoreStatusResponse("ok", entry.id, RestoreStatus.RS_UNDERWAY, None, None).asJson) case GlacierRestoreActor.RestoreNotRequested(entry)=> val restoreStatus = if(entry.storageClass==StorageClass.GLACIER) { RestoreStatus.RS_EXPIRED } else { RestoreStatus.RS_UNNEEDED } Ok(RestoreStatusResponse("not_requested", entry.id, restoreStatus, None, None).asJson) }) }) } } /** * checks the restore status of everything in the given bulk and returns a set of stats * @param user user whose lightbox we are checking * @param bulkId bulk ID we are checking * @return */ def bulkCheckRestoreStatus(user:String, bulkId:String) = IsAuthenticatedAsync { claims=> request=> withTargetUserProfile(request, user) { userProfile=> import akka.stream.scaladsl.GraphDSL.Implicits._ val sinkFactory = injector.getInstance(classOf[BulkRestoreStatsSink]) val graph = RunnableGraph.fromGraph(GraphDSL.create(sinkFactory) { implicit builder => sink=> val src = new LightboxDynamoSource(bulkId, config, dynamoClientManager) val actualSrc = builder.add(src) actualSrc ~> sink ClosedShape }) graph.run().map(stats=>{ Ok(ObjectGetResponse("ok","restore_stats", stats).asJson) }).recover({ case ex:Throwable=> logger.error(s"Could not run stream to check bulk restore status: ", ex) InternalServerError(GenericErrorResponse("error", ex.toString).asJson) }) } } /** * returns bulk entries for the current user * @return */ def myBulks(user:String) = IsAuthenticatedAsync { username=> request=> import cats.implicits._ withTargetUserProfile(request, user) { profile=> lightboxBulkEntryDAO.entriesForUser(profile.userEmail).flatMap(results=>{ val failures = results.collect({ case Left(err)=>err }) if(failures.nonEmpty){ Future(InternalServerError(GenericErrorResponse("error",failures.map(_.toString).mkString(",")).asJson)) } else { request.session.get("username") .map(userEmail=>LightboxHelper.getLooseCountForUser(indexName, userEmail)) .sequence .map({ case Some(Left(err))=> logger.error(s"Could not look up count for loose lightbox items: $err") val successes = results.collect({ case Right(value)=>value }) ++ List(LightboxBulkEntry.forLoose(profile.userEmail, 0)) Ok(ObjectListResponse("ok","lightboxBulk",successes,successes.length).asJson) case Some(Right(count))=> val successes = results.collect({ case Right(value)=>value }) ++ List(LightboxBulkEntry.forLoose(profile.userEmail, count)) Ok(ObjectListResponse("ok","lightboxBulk",successes,successes.length).asJson) case None=> BadRequest(GenericErrorResponse("auth_problem", "User information has no email address").asJson) }) } }) } } def deleteBulk(user:String, entryId:String) = IsAuthenticatedAsync { claims=> request=> withTargetUserProfile(request, user) { profile=> lightboxBulkEntryDAO.entryForId(UUID.fromString(entryId)).flatMap({ case None=> Future(NotFound(GenericErrorResponse("not_found","No bulk with that ID is present").asJson)) case Some(Right(entry))=> if(entry.userEmail==profile.userEmail || profile.isAdmin) { logger.info(s"Removing bulk entries for request $entry") LightboxHelper.removeBulkContents(indexName, profile, entry).flatMap(count=> { logger.info(s"Deleting bulk request $entry") lightboxBulkEntryDAO.delete(entryId).map(_ => { Ok(GenericErrorResponse("ok", "item deleted").asJson) }) }).recover({ case err: Throwable => logger.error("Could not delete record from dynamo: ", err) InternalServerError(GenericErrorResponse("db_error", err.toString).asJson) }) } else { Future(Forbidden(GenericErrorResponse("forbidden", "You don't have permission to do this, please contact your administrator").asJson)) } case Some(Left(err))=> logger.error(s"Could not look up bulk entry in dynamo: ${err.toString}") Future(InternalServerError(GenericErrorResponse("db_error",err.toString).asJson)) }) } } /** * check whether there is a bulk entry for the given collection and path, for the requesting user. * if nothing is found, a 200 response is still returned, but with a null in the entry field. * @return */ def haveBulkEntryFor(user:String) = IsAuthenticatedAsync(circe.json(2048)) { username=> request=> request.body.as[SearchRequest].fold( err=>Future(BadRequest(GenericErrorResponse("bad_request", err.toString).asJson)), rq=>{ if(rq.path.isDefined && rq.collection.isDefined) { val desc = s"${rq.collection.get}:${rq.path.get}" lightboxBulkEntryDAO.entryForDescAndUser(username,desc).map({ case Left(err)=>InternalServerError(GenericErrorResponse("db_error", err.toString).asJson) case Right(Some(entry))=>Ok(ObjectGetResponse("ok","lightboxbulk",entry.id).asJson) case Right(None)=>Ok(ObjectGetResponseEmpty("notfound","lightboxbulk").asJson) }) } else { Future(BadRequest(GenericErrorResponse("bad_request","You must set path and collection").asJson)) } } ) } private def makeDownloadToken(entryId:String, userEmail:String) = { val token = ServerTokenEntry.create(associatedId = Some(entryId),duration=tokenShortDuration, forUser = Some(userEmail)) serverTokenDAO.put(token).map(_=> { Ok(ObjectCreatedResponse("ok", "link", s"archivehunter:bulkdownload:${token.value}").asJson) }).recover({ case err:Throwable => logger.error(s"Could not save token to database: $err") InternalServerError(GenericErrorResponse("db_error", "Could not save token, see logs").asJson) }) } def bulkDownloadInApp(entryId:String) = IsAuthenticatedAsync { username=> request=> implicit val lightboxEntryDAOImpl = lightboxEntryDAO userProfileFromSession(request.session) match { case None=>Future(BadRequest(GenericErrorResponse("session_error","no session present").asJson)) case Some(Left(err))=> logger.error(s"Session is corrupted: ${err.toString}") Future(InternalServerError(GenericErrorResponse("session_error","session is corrupted, log out and log in again").asJson)) case Some(Right(profile))=> if(entryId=="loose"){ makeDownloadToken(entryId, username) } else { lightboxBulkEntryDAO.entryForId(UUID.fromString(entryId)).flatMap({ case None => Future(NotFound(GenericErrorResponse("not_found", "No bulk with that ID is present").asJson)) case Some(Right(_)) => //create a token that is valid for 10 seconds makeDownloadToken(entryId, username) case Some(Left(err)) => logger.error(s"Could not look up bulk entry in dynamo: ${err.toString}") Future(InternalServerError(GenericErrorResponse("db_error", err.toString).asJson)) }) } } } def redoRestore(user:String, fileId:String) = IsAuthenticatedAsync { claims=> request=> targetUserProfile(request, user).flatMap({ case None=>Future(BadRequest(GenericErrorResponse("session_error","no session present").asJson)) case Some(Left(err))=> logger.error(s"Session is corrupted: ${err.toString}") Future(InternalServerError(GenericErrorResponse("session_error","session is corrupted, log out and log in again").asJson)) case Some(Right(profile))=> Future.sequence(Seq( lightboxEntryDAO.get(profile.userEmail, fileId), indexer.getById(fileId))).flatMap(results=>{ val archiveEntry = results(1).asInstanceOf[ArchiveEntry] val lbEntryResponse = results.head.asInstanceOf[Option[Either[DynamoReadError, LightboxEntry]]] lbEntryResponse match { case Some(Right(lbEntry)) => if (profile.perRestoreQuota.isDefined && (archiveEntry.size/1048576L) < profile.perRestoreQuota.get) (glacierRestoreActor ? GlacierRestoreActor.InitiateRestore(archiveEntry, lbEntry, None)).mapTo[GRMsg].map({ case GlacierRestoreActor.RestoreSuccess => Ok(GenericErrorResponse("ok", "restore initiaited").asJson) case GlacierRestoreActor.RestoreFailure(err) => logger.error(s"Could not redo restore for $archiveEntry", err) InternalServerError(GenericErrorResponse("error", err.toString).asJson) }) else { profile.perRestoreQuota match { case Some(userQuota)=>logger.warn(s"Can't restore $fileId: user's quota of $userQuota Mb is less than file size of ${archiveEntry.size/1048576L}Mb") case None=>logger.warn(s"Can't restore $fileId: user has no quota") } Future(Forbidden(GenericErrorResponse("quota_exceeded", "This restore would exceed your quota").asJson)) } case Some(Left(error)) => Future(InternalServerError(GenericErrorResponse("db_error", error.toString).asJson)) case None => Future(InternalServerError(GenericErrorResponse("integrity_error", s"No lightbox entry available for file $fileId").asJson)) } }) }) } def verifyBulkLightbox(user:String, bulkId:String) = IsAuthenticatedAsync { claims=> request=> withTargetUserProfile(request, user) { userProfile => val sinkFactory = new UpdateLightboxIndexInfoSink(bulkId, userProfile, userAvatarHelper.getAvatarLocationString(user)) val graph = RunnableGraph.fromGraph(GraphDSL.create(sinkFactory) { implicit builder=> sink=> import akka.stream.scaladsl.GraphDSL.Implicits._ val src = new LightboxDynamoSource(bulkId, config, dynamoClientManager) val lbConverter = injector.getInstance(classOf[LookupArchiveEntryFromLBEntryFlow]) val actualSource = builder.add(src) val actualConverter = builder.add(lbConverter) val actualExtractor = builder.add(new ExtractArchiveEntry) actualSource ~> actualConverter ~> actualExtractor ~> sink.in ClosedShape }) val streamResult = graph.run() streamResult.map(foundItemCount=>{ Ok(CountResponse("ok","updated items", foundItemCount).asJson) }).recover({ case err:Throwable=> logger.error("Could not run stream to fix lightbox entries: ", err) InternalServerError(GenericErrorResponse("error",err.toString).asJson) }) } } //this is temporary and will be replaced in the update that brings in the new approval workflow def redoBulk(user:String, bulkId:String) = IsAdminAsync { claims=> request => logger.info("in redoBulk") targetUserProfile(request, user).flatMap({ case None => Future(BadRequest(GenericErrorResponse("session_error", "no session present").asJson)) case Some(Left(err)) => logger.error(s"Session is corrupted: ${err.toString}") Future(InternalServerError(GenericErrorResponse("session_error", "session is corrupted, log out and log in again").asJson)) case Some(Right(profile)) => logger.info(s"running with user profile $profile") val sink = injector.getInstance(classOf[InitiateRestoreSink]) val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit builder => resultSink => import akka.stream.scaladsl.GraphDSL.Implicits._ val source = new LightboxDynamoSource(bulkId,config, dynamoClientManager) val lookup = injector.getInstance(classOf[LookupArchiveEntryFromLBEntryFlow]) val actualSource = builder.add(source) val actualLookup = builder.add(lookup) actualSource ~> actualLookup ~> resultSink ClosedShape }) val streamResult = graph.run() streamResult.map(processedCount => { logger.info(s"bulk redo completed, processsed $processedCount items") Ok(CountResponse("ok", "triggered re-restore of items", processedCount).asJson) }) }) } }