app/controllers/BulkDownloadsController.scala (256 lines of code) (raw):
package controllers
import akka.NotUsed
import java.util.UUID
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Framing, Keep, Sink, Source}
import com.theguardian.multimedia.archivehunter.common.cmn_models.{LightboxBulkEntry, LightboxBulkEntryDAO, LightboxEntryDAO, RestoreStatus, RestoreStatusEncoder}
import javax.inject.{Inject, Named, Singleton}
import models.{ArchiveEntryDownloadSynopsis, ServerTokenDAO, ServerTokenEntry}
import play.api.{Configuration, Logger}
import play.api.libs.circe.Circe
import play.api.mvc.{AbstractController, ControllerComponents, ResponseHeader, Result}
import responses.{BulkDownloadInitiateResponse, GenericErrorResponse, ObjectGetResponse, RestoreStatusResponse}
import io.circe.generic.auto._
import io.circe.syntax._
import com.sksamuel.elastic4s.circe._
import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, ArchiveEntryHitReader, Indexer, StorageClass, ZonedDateTimeEncoder}
import com.theguardian.multimedia.archivehunter.common.clientManagers.{ESClientManager, S3ClientManager}
import helpers.S3Helper.getPresignedURL
import helpers.{LightboxHelper, SearchHitToArchiveEntryFlow}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
import akka.pattern.ask
import akka.util.ByteString
import auth.{BearerTokenAuth, Security}
import com.sksamuel.elastic4s.streams.ScrollPublisher
import org.slf4j.LoggerFactory
import play.api.cache.SyncCacheApi
import play.api.http.HttpEntity
import services.GlacierRestoreActor
import software.amazon.awssdk.regions.Region
import scala.concurrent.duration._
@Singleton
class BulkDownloadsController @Inject()(override val config:Configuration,
override val cache:SyncCacheApi,
serverTokenDAO: ServerTokenDAO,
lightboxBulkEntryDAO: LightboxBulkEntryDAO,
lightboxEntryDAO: LightboxEntryDAO,
esClientManager: ESClientManager,
cc:ControllerComponents,
override val bearerTokenAuth:BearerTokenAuth,
@Named("glacierRestoreActor") glacierRestoreActor:ActorRef,
)(implicit system:ActorSystem, mat:Materializer,s3ClientManager: S3ClientManager)
extends AbstractController(cc) with Security with Circe with ArchiveEntryHitReader with ZonedDateTimeEncoder with RestoreStatusEncoder {
override protected val logger=LoggerFactory.getLogger(getClass)
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.streams.ReactiveElastic._
val indexName = config.getOptional[String]("externalData.indexName").getOrElse("archivehunter")
val tokenLongDuration = config.getOptional[Int]("serverToken.longLivedDuration").getOrElse(7200) //default value is 2 hours
private val profileName = config.getOptional[String]("externalData.awsProfile")
protected implicit val esClient = esClientManager.getClient()
private val indexer = new Indexer(config.get[String]("externalData.indexName"))
val defaultLinkExpiry = 1800 //links expire after 30 minutes
val defaultRegion = Region.of(config.get[String]("externalData.awsRegion"))
private def errorResponse(updatedToken:ServerTokenEntry) = serverTokenDAO
.put(updatedToken)
.map(saveResult=>{
logger.error("generic error response for token")
Forbidden(GenericErrorResponse("forbidden","invalid or expired token").asJson)
}).recover({
case err:Throwable=>
logger.error(s"Could not update token: ", err)
Forbidden(GenericErrorResponse("forbidden","invalid or expired token").asJson)
})
/**
* bring back a list of all entries for the given bulk.
* @param bulkEntry bulkEntry identifying the objects to query
* @return a Future, containing a sequence of ArchiveEntryDownloadSynopsis
*/
protected def entriesForBulk(bulkEntry:LightboxBulkEntry, maybeFrom:Option[Int]=None, maybeLimit:Option[Int]=None) = {
val query = LightboxHelper.lightboxSearch(indexName, Some(bulkEntry.id), bulkEntry.userEmail) sortBy fieldSort("path.keyword") scroll "5m"
val finalQuery = if(maybeFrom.isDefined && maybeLimit.isDefined) {
query.from(maybeFrom.get).limit(maybeLimit.get)
} else {
query
}
val source = Source.fromPublisher(esClient.publisher(finalQuery))
val hitConverter = new SearchHitToArchiveEntryFlow()
val sink = Sink.fold[Seq[ArchiveEntryDownloadSynopsis], ArchiveEntryDownloadSynopsis](Seq())({ (acc,entry)=>
acc ++ Seq(entry)
})
source
.via(hitConverter)
.map(entry=>ArchiveEntryDownloadSynopsis.fromArchiveEntry(entry))
.toMat(sink)(Keep.right)
.run()
}
protected def getSearchSource(p:ScrollPublisher):Source[ArchiveEntry,NotUsed] = {
val source = Source.fromPublisher(p)
val hitConverter = new SearchHitToArchiveEntryFlow()
source
.via(hitConverter)
}
/**
* creates a stream that yields ArchiveentryDownloadSynopsis objects as an NDJSON stream
* @param bulkEntry
* @return
*/
protected def streamingEntriesForBulk(bulkEntry:LightboxBulkEntry) = {
val query = LightboxHelper.lightboxSearch(indexName, Some(bulkEntry.id), bulkEntry.userEmail) sortBy fieldSort("path.keyword") scroll "5m"
getSearchSource(esClient.publisher(query))
.map(entry=>ArchiveEntryDownloadSynopsis.fromArchiveEntry(entry))
.map(_.asJson)
.map(_.noSpaces + "\n")
.map(ByteString.apply)
}
protected def saveTokenOnly(updatedToken:ServerTokenEntry, bulkEntry: LightboxBulkEntry) = serverTokenDAO
.put(updatedToken)
.flatMap(_=>{
val retrievalToken = ServerTokenEntry.create(duration = tokenLongDuration, forUser = updatedToken.createdForUser, associatedId = updatedToken.associatedId) //create a 2 hour token to cover the download.
serverTokenDAO.put(retrievalToken).map(_=> {
Ok(BulkDownloadInitiateResponse("ok", bulkEntry, retrievalToken.value, None).asJson)
}).recover({
case err:Throwable=>
logger.error(s"Could not save retrieval token: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("db_error", s"Could not save retrieval token, see logs").asJson)
})
}).recover({
case err:Throwable=>
logger.error(s"Could not search index for bulk entries: ${err.getMessage}", err)
Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson)
})
protected def saveTokenAndGetDownload(updatedToken:ServerTokenEntry, bulkEntry: LightboxBulkEntry) = serverTokenDAO
.put(updatedToken)
.flatMap(_=>{
entriesForBulk(bulkEntry).flatMap(results=> {
val retrievalToken = ServerTokenEntry.create(duration = tokenLongDuration, forUser = updatedToken.createdForUser) //create a 2 hour token to cover the download.
serverTokenDAO.put(retrievalToken).map(_=> {
Ok(BulkDownloadInitiateResponse("ok", bulkEntry, retrievalToken.value, Some(results)).asJson)
}).recover({
case err:Throwable=>
logger.error(s"Could not save retrieval token: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("db_error", s"Could not save retrieval token, see logs").asJson)
})
}).recoverWith({
case err:Throwable=>
logger.error(s"Could not search index for bulk entries: $err")
Future(Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson))
})
})
/**
* take a one-time code generated in LightboxController, validate and expire it, then return the information of the
* associated LightboxBulkEntry
* @param codeValue value of the code
* @return a Json response containing the metadata of the
*/
def initiateWithOnetimeCode(codeValue:String) = Action.async {
initiateGuts(codeValue)(saveTokenAndGetDownload)
}
/**
* take a one-time code generated in LightboxController, validate and expire it, and return a long-term code.
* does NOT interrogate the LightboxBulkEntry, in v2 this is done seperately as it has a tendency to time out
* for larger restores
* @param codeValue value of the code
* @return a Json response containing the metadata of the
*/
def initiateWithOnetimeCodeV2(codeValue:String) = Action.async {
initiateGuts(codeValue)(saveTokenOnly)
}
/**
* get the bulk download summary for v2, as an NDJSON stream
* @param tokenValue long-term token to retrieve content
* @return
*/
def bulkDownloadSummary(tokenValue:String) = Action.async {
val tokenFut = serverTokenDAO.get(tokenValue)
tokenFut.flatMap({
case None =>
Future(Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson))
case Some(Left(err)) =>
Future(Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson))
case Some(Right(token)) =>
token.associatedId match {
case Some(associatedId) =>
lightboxBulkEntryDAO.entryForId(UUID.fromString(associatedId)).map({
case Some(Right(bulkEntry)) =>
val streamingSource = streamingEntriesForBulk(bulkEntry)
Result(
header = ResponseHeader(200, Map.empty),
body = HttpEntity.Streamed(streamingSource, None, Some("application/ndjson"))
)
case _ =>
logger.error(s"Could not retrieve lightbox bulk associated with ${token.associatedId.get}")
InternalServerError(GenericErrorResponse("db_error", "could not retrieve lightbox bulk").asJson)
})
case None =>
Future(NotFound(GenericErrorResponse("not_found", "token does not identify bulk").asJson))
}
})
}
/**
* common code for v1 and v2 initiate
* @param codeValue
* @param cb
* @return
*/
def initiateGuts(codeValue:String)(cb:(ServerTokenEntry, LightboxBulkEntry)=>Future[Result]) = {
serverTokenDAO.get(codeValue).flatMap({
case None=>
logger.error(s"No token exists for $codeValue")
Future(Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson))
case Some(Left(err))=>
logger.error(s"Could not verify one-time token: ${err.toString}")
Future(Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson))
case Some(Right(token))=>
val updatedToken = token.updateCheckExpired(maxUses=Some(1)).copy(uses = token.uses+1)
if(updatedToken.expired){
logger.error(s"Token ${updatedToken.value} is expired, denying access")
errorResponse(updatedToken)
} else {
token.associatedId match {
case None=>
logger.error(s"Token ${token.value} has no bulk associated with it!")
errorResponse(updatedToken)
case Some(associatedId)=>
if(associatedId=="loose"){
val looseBulkEntry = LightboxBulkEntry.forLoose(updatedToken.createdForUser.getOrElse("unknown"),-1)
cb(updatedToken, looseBulkEntry)
} else {
lightboxBulkEntryDAO.entryForId(UUID.fromString(associatedId)).flatMap({
case Some(Left(err)) => errorResponse(updatedToken)
case None => errorResponse(updatedToken)
case Some(Right(bulkEntry)) =>
cb(updatedToken, bulkEntry)
})
}
}
}
})
}
def getDownloadIdWithToken(tokenValue:String, fileId:String) = Action.async {
implicit val timeout:akka.util.Timeout = 30 seconds
serverTokenDAO.get(tokenValue).flatMap({
case None=>
Future(Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson))
case Some(Left(err))=>
Future(Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson))
case Some(Right(_))=>
indexer.getById(fileId).flatMap(archiveEntry=>{
val s3Client = s3ClientManager.getS3Client(profileName,archiveEntry.region.map(software.amazon.awssdk.regions.Region.of))
val response = (glacierRestoreActor ? GlacierRestoreActor.CheckRestoreStatusBasic(archiveEntry)).mapTo[GlacierRestoreActor.GRMsg]
response.map({
case GlacierRestoreActor.NotInArchive(entry)=>
getPresignedURL(archiveEntry, defaultLinkExpiry, defaultRegion, profileName)
.map(url=>Ok(RestoreStatusResponse("ok",entry.id, RestoreStatus.RS_UNNEEDED, None, Some(url.toString)).asJson))
case GlacierRestoreActor.RestoreCompleted(entry, expiry)=>
getPresignedURL(archiveEntry, defaultLinkExpiry, defaultRegion, profileName)
.map(url=>Ok(RestoreStatusResponse("ok", entry.id, RestoreStatus.RS_SUCCESS, Some(expiry), Some(url.toString)).asJson))
case GlacierRestoreActor.RestoreInProgress(entry)=>
Success(Ok(RestoreStatusResponse("ok", entry.id, RestoreStatus.RS_UNDERWAY, None, None).asJson))
case GlacierRestoreActor.RestoreNotRequested(entry)=>
if(entry.storageClass!=StorageClass.GLACIER){ //if the file is not registered as in Glacier, then update it.
val updatedEntry = entry.copy(storageClass=StorageClass.GLACIER)
indexer.indexSingleItem(updatedEntry, Some(updatedEntry.id)).onComplete({
case Failure(err)=>
logger.error(s"Could not update storage class for incorrect item $entry: $err")
case Success(_)=>
logger.info(s"Updated $entry as it had invalid storage class. Now requesting restore")
glacierRestoreActor ! GlacierRestoreActor.InitiateRestoreBasic(updatedEntry,None)
})
}
Success(Ok(RestoreStatusResponse("not_requested", entry.id, RestoreStatus.RS_ERROR, None, None).asJson))
case GlacierRestoreActor.ItemLost(entry)=>
logger.error(s"Bulk item ${entry.bucket}:${entry.path} is lost")
val updatedEntry = entry.copy(beenDeleted = true)
indexer.indexSingleItem(updatedEntry, Some(updatedEntry.id)).onComplete({
case Success(_)=>
logger.info(s"${entry.bucket}:${entry.path} has been updated to indicate it is lost")
case Failure(err)=>
logger.error(s"Could not update ${entry.bucket}:${entry.path}")
})
Success(NotFound(GenericErrorResponse("not_found","item has been deleted!").asJson))
case GlacierRestoreActor.RestoreFailure(err)=>
logger.error(s"Could not check restore status: ", err)
Success(InternalServerError(GenericErrorResponse("error",err.toString).asJson))
}).map({
case Success(httpResponse)=>httpResponse
case Failure(err)=>
logger.error(s"Could not get download for $fileId with token $tokenValue", err)
InternalServerError(GenericErrorResponse("error",err.toString).asJson)
})
}).recover({
case err:Throwable=>
logger.error(s"Could not ascertain glacier restore status for $fileId: ", err)
InternalServerError(GenericErrorResponse("error", err.toString).asJson)
})
})
}
}