app/controllers/VaultController.scala (184 lines of code) (raw):

package controllers import akka.actor.{ActorRef, ActorSystem} import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source} import akka.stream.{Attributes, ClosedShape, Materializer, SourceShape} import auth.{BearerTokenAuth, Security} import com.om.mxs.client.japi.{MatrixStore, SearchTerm, UserInfo, Vault} import helpers.{RangeHeader, UserInfoCache, ZonedDateTimeEncoder} import javax.inject.{Inject, Named, Singleton} import play.api.Configuration import play.api.cache.SyncCacheApi import play.api.libs.circe.Circe import play.api.mvc.{AbstractController, AnyContent, ControllerComponents, Request, ResponseHeader, Result} import responses.{GenericErrorResponse, KnownVaultResponse, SingleItemDownloadTokenResponse} import io.circe.generic.auto._ import io.circe.syntax._ import models.{AuditEvent, AuditFile, CachedEntry, ExistingArchiveContentCache, ObjectMatrixEntry, ServerTokenDAO, ServerTokenEntry} import play.api.http.HttpEntity import streamcomponents.{AuditLogFinish, MatrixStoreFileSourceWithRanges, MultipartSource, OMFastSearchSource} import akka.pattern.ask import services.DuplicateFinderService import scala.concurrent.Future import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext.Implicits.global @Singleton class VaultController @Inject() (cc:ControllerComponents, override implicit val config:Configuration, override val bearerTokenAuth:BearerTokenAuth, @Named("audit-actor") auditActor:ActorRef, userInfoCache:UserInfoCache, serverTokenDAO: ServerTokenDAO, dupFinder: DuplicateFinderService )(implicit mat:Materializer,system:ActorSystem, override implicit val cache:SyncCacheApi) extends AbstractController(cc) with Security with ObjectMatrixEntryMixin with Circe with ZonedDateTimeEncoder{ def knownVaults() = IsAuthenticated { uid=> request=> val content = userInfoCache.byVaultId val responses = content.values .flatMap(_.headOption) .map(KnownVaultResponse.fromBuilder) Ok(responses.asJson) } /** * execute a block with a vault for the given ID and ensure that the vault connection is disposed of when * we finish, regardless of whether an exception occurred * @param vaultId the Vault ID (as a string) * @param block function to call as a block. Takes the Vault reference as a parameter and should return a Result * @return */ def withVaultForId(vaultId:String)(block: Vault=>Future[Result]) = { val maybeResult = userInfoCache.infoForVaultId(vaultId) match { case Some(userInfo)=> val maybeVault = Future { MatrixStore.openVault(userInfo)} maybeVault .flatMap(v=>block(v).andThen({ case _=>v.dispose() })) case None=>Future(NotFound(GenericErrorResponse("not_found","either the vault or file id is not valid").asJson)) } maybeResult.recover({ case err:Throwable=> logger.error(s"Could not perform vault operation: ", err) InternalServerError(GenericErrorResponse("server_error", err.getMessage).asJson) }) } def withVaultForIdSync(vaultId:String)(block: Vault=>Result) = withVaultForId(vaultId) { v=>Future{block(v)}} def getMaybeResponseSize(entry:ObjectMatrixEntry, overriden:Option[Long]):Option[Long] = { overriden match { case value @Some(_)=>value case None=>entry.fileAttribues.map(_.size) } } def getMaybeMimetype(entry:ObjectMatrixEntry):Option[String] = entry.attributes.flatMap(_.stringValues.get("MXFS_MIMETYPE")) /** * return the metadata for the given oid on the given vault, for a HEAD request * @param vaultId vault ID to query * @param oid object ID to query * @return */ def headTargetContent(vaultId:String, oid:String) = IsAuthenticatedAsync { uid=> request=> withVaultForId(vaultId) { implicit vault=> val initialEntry = ObjectMatrixEntry(oid) initialEntry.getMetadata.map(entry=> Result( ResponseHeader(200,headersForEntry(entry, Seq(), getMaybeResponseSize(entry, None))), HttpEntity.Streamed(Source.empty, getMaybeResponseSize(entry, None), getMaybeMimetype(entry)) )) } } def createSingleDownloadToken(vaultId:String, oid:String) = IsAuthenticatedAsync { uid=> request=> withVaultForId(vaultId) { _=> val expiry = config.getOptional[Int]("serverToken.shortLivedDuration").getOrElse(10) val token = ServerTokenEntry.create(Some(s"${vaultId}:${oid}"), forUser=Some(uid)) serverTokenDAO.put(token, expiry).map({ case true=> Ok(SingleItemDownloadTokenResponse(s"/api/rawdownload/${token.value}").asJson) case false=> InternalServerError(GenericErrorResponse("error","Could not save server token, see logs for details").asJson) }) .recover({ case err:Throwable=> logger.error(s"Could not save token: ${err.getMessage}", err) InternalServerError(GenericErrorResponse("error","Save token operation failed, see server logs").asJson) }) } } private def splitAssociatedIds(from:Option[String]):Option[(String,String)] = from.flatMap(str=>{ val parts = str.split(":") if(parts.length==2){ Some((parts.head, parts(1))) } else { None } }) def singleTokenDownload(token:String) = Action.async { request=> serverTokenDAO.get(token).flatMap({ case Some(serverToken)=> serverTokenDAO.remove(token) splitAssociatedIds(serverToken.associatedId) match { case Some((vaultId, oid))=> streamTargetContent(vaultId, oid, request, serverToken.createdForUser.getOrElse("anonymous")) case None=> logger.error(s"Server token $token for ${serverToken.createdForUser} was invalid, did not have vault and object IDs") Future(NotFound(GenericErrorResponse("not_found","invalid token").asJson)) } case None=> Future(NotFound(GenericErrorResponse("not_found","").asJson)) }) } /** * gets a multipart source if needed or just gets a single source if no ranges specified * @param ranges a sequence of [[RangeHeader]] objects. If empty a single source for the entire file is returned * @param userInfo userInfo object describing the appliance and vault to target * @param omEntry [[ObjectMatrixEntry]] instance describing the file to target * @return an akka Source that yields ByteString contents of the file */ def getStreamingSource(ranges:Seq[RangeHeader], userInfo:UserInfo, omEntry:ObjectMatrixEntry, auditFile:AuditFile, uid:String) = Try { import akka.stream.scaladsl.GraphDSL.Implicits._ val partialGraph = if(ranges.length>1) { val mpSep = MultipartSource.genSeparatorText val rangesAndSources = MultipartSource.makeSources(ranges, userInfo, omEntry) GraphDSL.create() { implicit builder => val src = builder.add(MultipartSource.getSource(rangesAndSources, omEntry.fileAttribues.get.size, "application/octet-stream", mpSep)) val audit = builder.add(new AuditLogFinish(auditActor,auditFile,uid, omEntry.fileAttribues.get.size)) src ~> audit SourceShape(audit.out) } } else { GraphDSL.create() { implicit builder=> val src = builder.add(new MatrixStoreFileSourceWithRanges(userInfo,omEntry.oid,omEntry.fileAttribues.get.size,ranges)) val audit = builder.add(new AuditLogFinish(auditActor,auditFile,uid, omEntry.fileAttribues.get.size)) src ~> audit SourceShape(audit.out) } } Source.fromGraph(partialGraph) } /** * third test, use the MatrixStoreFileSourceWithRanges to efficiently stream ranges of content * @param targetUriString omms URI of the object that we are trying to get * @return */ def streamTargetContent(vaultId:String, oid:String, request:Request[AnyContent], uid:String) = { /* break down the ranges header into structured data */ val rangesOrFailure = request.headers.get("Range") match { case Some(hdr)=>RangeHeader.fromStringHeader(hdr) case None=>Success(Seq()) } withVaultForIdSync(vaultId) { implicit vault => val omEntry = ObjectMatrixEntry(oid).getMetadataSync /* get hold of a streaming source, if possible */ val maybeResult = rangesOrFailure.flatMap(ranges => { val userInfo = userInfoCache.infoForVaultId(vaultId) val responseSize = if (ranges.nonEmpty) { Some(ranges.foldLeft(0L)((acc, range) => acc + (range.end.getOrElse(omEntry.fileAttribues.get.size) - range.start.getOrElse(0L)))) } else { omEntry.fileAttribues.map(_.size) } logger.debug(s"Ranges is ${ranges}") //log that we are starting a streamout val auditFile = AuditFile(omEntry.oid, "") auditActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT, uid, Some(auditFile), ranges) getStreamingSource(ranges, userInfo.get, omEntry, auditFile, uid) match { case Success(partialGraph) => Success((Source.fromGraph(partialGraph), responseSize, headersForEntry(omEntry, ranges, responseSize), getMaybeMimetype(omEntry), ranges.nonEmpty)) case Failure(err) => //if we did not get a source, log that auditActor ! actors.Audit.LogEvent(AuditEvent.OMERROR, uid, Some(auditFile), ranges, notes = Some(err.getMessage)) logger.error(s"Could not set up streaming source: ", err) Failure(new RuntimeException("Could not set up streaming source, see logs for more details")) } }) maybeResult match { case Success((byteSource, maybeResponseSize, headers, maybeMimetype, isPartialTransfer)) => logger.debug(s"maybeResponseSize is $maybeResponseSize") Result( ResponseHeader(if (isPartialTransfer) 206 else 200, headers), HttpEntity.Streamed(byteSource.log("outputstream").addAttributes( Attributes.logLevels( onElement = Attributes.LogLevels.Info, onFailure = Attributes.LogLevels.Error, onFinish = Attributes.LogLevels.Info)), None, maybeMimetype) ) //we can't give a proper content length, because if we are sending multipart chunks that adds overhead to the request size. case Failure(err) => InternalServerError(GenericErrorResponse("error", err.getMessage).asJson) } } } def findDuplicates(vaultId:String) = IsAuthenticatedAsync { uid=> request=> dupFinder.getDuplicateData(vaultId).map(result=>{ Ok(result.asJson)} ).recover({ case _:Throwable=> BadRequest(GenericErrorResponse("error", "An error occurred when attempting to load duplicate data.").asJson) }) } }