app/controllers/BulkDownloadController.scala (275 lines of code) (raw):
package controllers
import java.time.{Instant, ZonedDateTime}
import akka.actor.ActorSystem
import akka.stream.{ClosedShape, Materializer, SourceShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import auth.{BearerTokenAuth, Security}
import com.om.mxs.client.japi.{MatrixStore, SearchTerm, UserInfo, Vault}
import helpers.{MetadataHelper, SearchTermHelper, UserInfoCache}
import akka.util.ByteString
import javax.inject.{Inject, Singleton}
import models.{ArchiveEntryDownloadSynopsis, LightboxBulkEntry, ObjectMatrixEntry, ServerTokenDAO, ServerTokenEntry}
import play.api.Configuration
import play.api.libs.circe.Circe
import play.api.mvc.{AbstractController, AnyContent, ControllerComponents, Request, ResponseHeader, Result}
import responses.{BulkDownloadInitiateResponse, DownloadManagerItemResponse, GenericErrorResponse, GenericObjectResponse}
import io.circe.syntax._
import io.circe.generic.auto._
import play.api.cache.SyncCacheApi
import play.api.http.HttpEntity
import streamcomponents.{MakeDownloadSynopsis, MatrixStoreFileSourceWithRanges, OMFastContentSearchSource, OMFastSearchSource}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
@Singleton
class BulkDownloadController @Inject() (cc:ControllerComponents,
override implicit val config:Configuration,
override val bearerTokenAuth:BearerTokenAuth,
serverTokenDAO: ServerTokenDAO,
userInfoCache: UserInfoCache)
(implicit mat:Materializer, system:ActorSystem, override implicit val cache:SyncCacheApi)
extends AbstractController(cc) with Circe with Security with ObjectMatrixEntryMixin{
/**
* looks up the given vault ID in the UserInfoCache and calls the provided block with the resolved UserInfo object.
* The block is expected to return a Play Result object and this is passed back as the ultimate result
* If the vault ID cannot be found then a NotFound response is returned and the block is not called
* @param vaultId vault ID to query
* @param block a block that takes a single UserInfo argument and returns a Result
* @return a Play Result, either the return value of the block of a NotFound
*/
def withVault(vaultId:String)(block:UserInfo=>Result) = userInfoCache.infoForVaultId(vaultId) match {
case Some(userInfo)=>block(userInfo)
case None=>NotFound(GenericErrorResponse("not_found","").asJson)
}
def withVaultAsync(vaultId:String)(block:UserInfo=>Future[Result]) = userInfoCache.infoForVaultId(vaultId) match {
case Some(userInfo)=>block(userInfo)
case None=>Future(NotFound(GenericErrorResponse("not_found",vaultId).asJson))
}
/**
* called from a logged-in browser session to initiate the download session.
* Generates a one-time token tied to the user, project and vault, saves it and returns it to the frontend.
* No search or validation is done on the vault or project IDs at this point.
* It's assumed that the storage backend takes care of expiry
* @param vaultId Vault ID that is being targeted
* @param projectId Project ID that is being targeted
* @return a Play response
*/
def initiate(vaultId:String,projectId:String) = IsAuthenticatedAsync { uid=> request=>
val expiry = config.getOptional[Int]("serverToken.shortLivedDuration").getOrElse(10)
val newToken = ServerTokenEntry.create(Some(projectId + "|" + vaultId), expiry, Some(uid))
serverTokenDAO.put(newToken, expiry)
.map(_=>Ok(GenericObjectResponse("ok","link",s"archivehunter:vaultdownload:${newToken.value}").asJson))
.recover({
case err:Throwable=>
logger.error(s"Could not save token with id ${newToken.value} for project ${projectId} to backend: ",err)
InternalServerError(GenericErrorResponse("db_error","Could not save token").asJson)
})
}
/**
* performs a streaming search against the ObjectMatrix vault and convert the returned results to
* ArchiveEntryDownloadSynopsis objects for sending to Download Manager
* @param userInfo UserInfo instance indicating the appliance and vault to target
* @param projectId project ID to search for
* @return an ArchiveEntryDownloadSynopsis object for each file of the project, returned in a Future
*/
def getContent(userInfo:UserInfo, projectId:String, onlyRushes:Boolean=true) = {
val usefulFields = Array("MXFS_PATH","MXFS_FILENAME","DPSP_SIZE")
SearchTermHelper.projectIdQuery(projectId, onlyRushes) match {
case Some(projectQuery) =>
val sinkFact = Sink.seq[ArchiveEntryDownloadSynopsis]
val graph = GraphDSL.create(sinkFact) { implicit builder =>
sink =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val src = builder.add(new OMFastContentSearchSource(userInfo, projectQuery.withKeywords(usefulFields).build))
val converter = builder.add(new MakeDownloadSynopsis(config.getOptional[Seq[String]]("bulkDownload.stripPrefixes")))
src ~> converter ~> sink
ClosedShape
}
RunnableGraph.fromGraph(graph).run().map(Right.apply)
case None =>
Future(Left("project ID is invalid"))
}
}
/**
* create and save a long-term token to allow the download of items
* @param uid user ID creating the token
* @param combinedId combined vault and project ID string
* @return the saved ServerTokenEntry in a Future
*/
def createRetrievalToken(uid:String, combinedId:String) = {
val expiry = config.getOptional[Int]("serverToken.longLivedDuration").getOrElse(3600)
val newToken = ServerTokenEntry.create(Some(combinedId), expiry, Some(uid))
serverTokenDAO.put(newToken, expiry).map(_=>newToken)
}
/**
* handle the return of a short-lived token. Validate it and if it passes delete it, then send back a long-lived
* token and a list of items to download.
* @param tokenId the ID of a short-lived token
* @return
*/
def getBulkDownload(tokenId:String, notOnlyRushes:Option[Boolean]) = Action.async {
serverTokenDAO.get(tokenId).flatMap({
case None=>
Future(NotFound(GenericErrorResponse("not_found","No such bulk download").asJson))
case Some(token)=>
serverTokenDAO.remove(tokenId).flatMap(_=> {
token.associatedId match {
case None=>
logger.error(s"Token $tokenId is invalid, it does not contain a project ID")
Future(NotFound(GenericErrorResponse("not_found","Invalid token").asJson))
case Some(combinedId)=>
val ids = combinedId.split("\\|")
val projectId = ids.head
val vaultId = ids(1)
logger.debug(s"Combined ID is $combinedId, project ID is $projectId, vault ID is $vaultId")
withVaultAsync(vaultId) { userInfo =>
createRetrievalToken(token.createdForUser.getOrElse(""), combinedId).flatMap(retrievalToken=> {
getContent(userInfo, projectId, !notOnlyRushes.getOrElse(false)).map({
case Right(synopses)=>
val meta = LightboxBulkEntry(projectId, s"Vaultdoor download for project $projectId", token.createdForUser.getOrElse(""), ZonedDateTime.now(), 0, synopses.length, 0)
Ok(BulkDownloadInitiateResponse("ok", meta, retrievalToken.value, synopses).asJson)
case Left(problem)=>
logger.warn(s"Could not complete bulk download for token $tokenId: $problem")
BadRequest(GenericErrorResponse("invalid", problem).asJson)
})
})
}
}
}).recover({
case err:Throwable=>
logger.error(s"Could not get bulk download for token $tokenId: ", err)
InternalServerError(GenericErrorResponse("error","Server failure, please check the logs").asJson)
})
})
}
def eitherOr[T](opt1:Option[T],opt2:Option[T]):Option[T] = {
if(opt1.isDefined){
opt1
} else {
opt2
}
}
/**
* validates that there is a token in the headers of the given request and that it is valid.
* if so, pass it on to the provided Block and return the result. Otherwise return a Play response indicating the error
* @param request request object
* @param block function to process the request if the token is valid.
* Takes three parameters, being the entire ServerTokenEntry, the project ID and the vault ID as encoded in the ServerTokenEntry
* @return the result of the Block or a JSON formatted error response
*/
def validateTokenAsync(request:Request[AnyContent], actualTokenValue:Option[String])(block: (ServerTokenEntry,String,String)=>Future[Result]):Future[Result] = {
eitherOr(actualTokenValue, request.headers.get("X-Download-Token")) match {
case None=>
logger.error(s"Attempt to download with no X-Download-Token header")
Future(BadRequest(GenericErrorResponse("bad_request","No download token in headers").asJson))
case Some(tokenId)=>
logger.debug(s"Got token ID $tokenId")
serverTokenDAO.get(tokenId).flatMap({
case Some(serverToken)=>
logger.debug(s"Got server token $serverToken for $tokenId")
val updatedServerToken = serverToken.copy(uses=serverToken.uses+1)
val expirySeconds = serverToken.expiry.get.toInstant.getEpochSecond - Instant.now().getEpochSecond
if(expirySeconds<1){
logger.error(s"Server token $serverToken is expired")
Future(BadRequest(GenericErrorResponse("token_error","Expired token").asJson))
} else {
serverTokenDAO.put(updatedServerToken, expirySeconds.toInt).flatMap(_ => {
val idSplit = serverToken.associatedId.get.split("\\|")
block(serverToken, idSplit.head, idSplit(1))
})
}
case None=>
Future(NotFound(GenericErrorResponse("not_found","item was not found").asJson))
})
}
}
def getStreamingSourceFor(userInfo:UserInfo, omEntry:ObjectMatrixEntry) = {
Source.fromGraph(GraphDSL.create() { implicit builder =>
val src = builder.add(new MatrixStoreFileSourceWithRanges(userInfo,omEntry.oid,omEntry.fileAttribues.get.size,Seq()))
SourceShape(src.out)
})
}
def getMaybeResponseSize(entry:ObjectMatrixEntry, overriden:Option[Long]):Option[Long] = {
overriden match {
case value @Some(_)=>value
case None=>entry.fileAttribues.map(_.size)
}
}
def getMaybeMimetype(entry:ObjectMatrixEntry):Option[String] = entry.attributes.flatMap(_.stringValues.get("MXFS_MIMETYPE"))
def bulkDownloadItem(tokenValue:String, itemId:String) = Action {
//in ArchiveHunter this step is necessary to compute a presigned URL. Here it's not, we can stream the data right away.
//but to keep the same protocol, we just send back the "right" URL for the data and a flag saying it's available.
Ok(DownloadManagerItemResponse("ok","RS_ALREADY",s"/api/bulk/$tokenValue/get/$itemId/data").asJson)
}
def bulkDownloadItemData(tokenValue:String, itemId:String) = Action.async { request=>
logger.warn(s"bulkDownloadItem: token is $tokenValue, item ID is $itemId")
validateTokenAsync(request, Some(tokenValue)) { (serverToken, projectId, vaultId)=>
logger.debug(s"In bulkDownloadItem for $itemId on vault $vaultId")
userInfoCache.infoForVaultId(vaultId) match {
case None=>
logger.error(s"bulkDownloadItem - vaultId is not valid, had no userInfoCache entry")
Future(NotFound(GenericErrorResponse("not_found","item or vault not found").asJson))
case Some(userInfo)=>
implicit val vault:Vault = MatrixStore.openVault(userInfo)
ObjectMatrixEntry(itemId).getMetadata.map(entry=>{
entry.attributes.flatMap(_.stringValues.get("GNM_PROJECT_ID")) match {
case None=>
logger.error(s"Item $itemId found in vault $vaultId but it is not a member of any project!")
vault.dispose()
NotFound(GenericErrorResponse("not_found","item or vault not found").asJson)
case Some(itemsProjectId)=>
if(projectId!=itemsProjectId) {
logger.error(s"Item $itemId found in vault $vaultId but it is a member of project $itemsProjectId not $projectId")
vault.dispose()
NotFound(GenericErrorResponse("not_found","item or vault not found").asJson)
} else {
val headers = headersForEntry(entry, Seq(), getMaybeResponseSize(entry, None))
val mxsEntry = vault.getObject(itemId)
val updatedHeaders = MetadataHelper.getOMFileMd5(mxsEntry) match {
case Failure(err)=>
logger.warn(s"Could not get appliance MD5: ", err)
headers
case Success(checksum)=>headers + ("ETag"->checksum)
}
vault.dispose()
Result(
ResponseHeader(200, updatedHeaders),
HttpEntity.Streamed(getStreamingSourceFor(userInfo, entry), getMaybeResponseSize(entry, None), getMaybeMimetype(entry))
)
}
}
}).recover({
case err:Throwable=>
logger.error(s"Could not get metadata for $itemId: ", err)
vault.dispose()
InternalServerError(GenericErrorResponse("sever_error","Could not get metadata").asJson)
})
}
}
}
/**
* Handle the return of a short-lived token. Validate it and if it passes delete it, then send back a long-lived
* token.
* @param tokenId The id. of a short-lived token
* @return
*/
def getToken(tokenId:String, notOnlyRushes:Option[Boolean]) = Action.async {
serverTokenDAO.get(tokenId).flatMap({
case None=>
Future(NotFound(GenericErrorResponse("not_found","No such bulk download").asJson))
case Some(token)=>
serverTokenDAO.remove(tokenId).flatMap(_=> {
token.associatedId match {
case None=>
logger.error(s"Token $tokenId is invalid, it does not contain a project ID")
Future(NotFound(GenericErrorResponse("not_found","Invalid token").asJson))
case Some(combinedId)=>
val ids = combinedId.split("\\|")
val projectId = ids.head
val vaultId = ids(1)
logger.debug(s"Combined ID is $combinedId, project ID is $projectId, vault ID is $vaultId")
withVaultAsync(vaultId) { userInfo =>
createRetrievalToken(token.createdForUser.getOrElse(""), combinedId).flatMap(retrievalToken=> {
getContent(userInfo, projectId, !notOnlyRushes.getOrElse(false)).map({
case Right(synopses)=>
val meta = LightboxBulkEntry(projectId, s"Vaultdoor download for project $projectId", token.createdForUser.getOrElse(""), ZonedDateTime.now(), 0, synopses.length, 0)
Ok(BulkDownloadInitiateResponse("ok", meta, retrievalToken.value, synopses).asJson)
case Left(problem)=>
logger.warn(s"Could not complete bulk download for token $tokenId: $problem")
BadRequest(GenericErrorResponse("invalid", problem).asJson)
})
})
}
}
}).recover({
case err:Throwable=>
logger.error(s"Could not get bulk download for token $tokenId: ", err)
InternalServerError(GenericErrorResponse("error","Server failure, please check the logs").asJson)
})
})
}
def outPutSynopses (synopses:Seq[ArchiveEntryDownloadSynopsis]) = {
synopses
.map(_.asJson)
.map(_.noSpaces + "\n")
.toString().substring(7).dropRight(1).replace("\n,","\n")
}
/**
* Get the bulk download summary for v2, as JSON.
* @param tokenValue Long-term token to retrieve content
* @return
*/
def bulkDownloadSummary(tokenValue:String, notOnlyRushes:Option[Boolean]) = Action.async {
val tokenFut = serverTokenDAO.get(tokenValue)
tokenFut.flatMap({
case None =>
Future(Forbidden(GenericErrorResponse("forbidden", "invalid or expired token").asJson))
case Some(token) =>
token.associatedId match {
case None =>
logger.error(s"Token $tokenValue is invalid, it does not contain a project ID")
Future(NotFound(GenericErrorResponse("not_found", "Invalid token").asJson))
case Some(combinedId) =>
val ids = combinedId.split("\\|")
val projectId = ids.head
val vaultId = ids(1)
logger.debug(s"Combined ID is $combinedId, project ID is $projectId, vault ID is $vaultId")
withVaultAsync(vaultId) { userInfo =>
getContent(userInfo, projectId, !notOnlyRushes.getOrElse(false)).map({
case Right(synopses) =>
Result(
header = ResponseHeader(200, Map.empty),
body = HttpEntity.Strict(ByteString.fromString(outPutSynopses(synopses)), Some("application/ndjson"))
)
case Left(problem) =>
logger.warn(s"Could not complete bulk download for token $tokenValue: $problem")
BadRequest(GenericErrorResponse("invalid", problem).asJson)
})
}
}
})
}
}