app/controllers/Application.scala (188 lines of code) (raw):

package controllers import java.net.URI import akka.actor.{ActorRef, ActorSystem} import akka.stream.scaladsl.{Framing, GraphDSL, Keep, Sink, Source} import helpers.{ OMLocator, RangeHeader, UserInfoCache} import javax.inject.{Inject, Named, Singleton} import org.slf4j.LoggerFactory import play.api.Configuration import play.api.http.HttpEntity import play.api.mvc._ import helpers.BadDataError import scala.util.{Failure, Success, Try} import akka.pattern.ask import akka.stream.{Attributes, Materializer, SourceShape} import com.om.mxs.client.japi.{MatrixStore, UserInfo, Vault} import streamcomponents.{MatrixStoreFileSourceWithRanges, MultipartSource} import models.{AuditEvent, AuditFile, ObjectMatrixEntry} import streamcomponents.{AuditLogFinish, MatrixStoreFileSourceWithRanges} import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import play.api.cache.SyncCacheApi import auth.{BearerTokenAuth, Security} import play.api.libs.circe.Circe import responses.{FrontendConfigResponse, GenericErrorResponse} import io.circe.syntax._ import io.circe.generic.auto._ @Singleton class Application @Inject() (cc:ControllerComponents, override implicit val config:Configuration, override val bearerTokenAuth:BearerTokenAuth, @Named("object-cache") objectCache:ActorRef, @Named("audit-actor") auditActor:ActorRef, userInfoCache:UserInfoCache )(implicit mat:Materializer,system:ActorSystem, override implicit val cache:SyncCacheApi) extends AbstractController(cc) with ObjectMatrixEntryMixin with Security with Circe { import actors.ObjectCache._ override protected val logger = LoggerFactory.getLogger(getClass) private lazy val bufferSize = config.getOptional[Int]("vaults.streamingBufferSize").map(s=>s * 1024*1024).getOrElse(128*1024*1024) private implicit val timeout:akka.util.Timeout = 30.seconds def index = Action { Ok(views.html.index("VaultDoor")("no-cb")) } def getMaybeResponseSize(entry:ObjectMatrixEntry, overriden:Option[Long]):Option[Long] = { overriden match { case value @Some(_)=>value case None=>entry.fileAttribues.map(_.size) } } def getMaybeMimetype(entry:ObjectMatrixEntry):Option[String] = entry.attributes.flatMap(_.stringValues.get("MXFS_MIMETYPE")) def headTargetContent(targetUriString:String) = IsAuthenticatedAsync { uid=> request=> val maybeTargetUri = Try { URI.create(targetUriString) } val maybeLocator = maybeTargetUri.flatMap(targetUri => OMLocator.fromUri(targetUri)) /* look up the object, using cache if possible, and get hold of the metadata */ val objectEntryFut = Future.fromTry(maybeLocator).flatMap(locator=>{ (objectCache ? Lookup(locator)).mapTo[OCMsg].map({ case ObjectNotFound(_) => val auditFile = AuditFile("",locator.filePath) auditActor ! actors.Audit.LogEvent(AuditEvent.NOTFOUND, uid, Some(auditFile), Seq()) Left(NotFound(s"could not find object $targetUriString")) //FIXME: replace with proper json response case ObjectLookupFailed(_, err) => val auditFile = AuditFile("",locator.filePath) auditActor ! actors.Audit.LogEvent(AuditEvent.OMERROR, uid, Some(auditFile), Seq(),notes=Some(err.toString)) logger.error(s"Could not look up object for $targetUriString: ", err) Left(InternalServerError(s"lookup failed for $targetUriString")) case ObjectFound(_, objectEntry) => val auditFile = AuditFile(objectEntry.oid,locator.filePath) auditActor ! actors.Audit.LogEvent(AuditEvent.HEADFILE, uid, Some(auditFile), Seq()) Right(objectEntry) }) }) objectEntryFut.map({ case Left(response)=>response case Right(entry)=> Result( ResponseHeader(200,headersForEntry(entry, Seq(), getMaybeResponseSize(entry, None))), HttpEntity.Streamed(Source.empty, getMaybeResponseSize(entry, None), getMaybeMimetype(entry)) ) }) } /** * gets a multipart source if needed or just gets a single source if no ranges specified * @param ranges a sequence of [[RangeHeader]] objects. If empty a single source for the entire file is returned * @param userInfo userInfo object describing the appliance and vault to target * @param omEntry [[ObjectMatrixEntry]] instance describing the file to target * @return an akka Source that yields ByteString contents of the file */ def getStreamingSource(ranges:Seq[RangeHeader], userInfo:UserInfo, omEntry:ObjectMatrixEntry, auditFile:AuditFile, uid:String) = Try { import akka.stream.scaladsl.GraphDSL.Implicits._ val partialGraph = if(ranges.length>1) { val mpSep = MultipartSource.genSeparatorText val rangesAndSources = MultipartSource.makeSources(ranges, userInfo, omEntry) GraphDSL.create() { implicit builder => val src = builder.add(MultipartSource.getSource(rangesAndSources, omEntry.fileAttribues.get.size, "application/octet-stream", mpSep)) val audit = builder.add(new AuditLogFinish(auditActor,auditFile,uid, omEntry.fileAttribues.get.size)) src ~> audit SourceShape(audit.out) } } else { GraphDSL.create() { implicit builder=> val src = builder.add(new MatrixStoreFileSourceWithRanges(userInfo,omEntry.oid,omEntry.fileAttribues.get.size,ranges)) val audit = builder.add(new AuditLogFinish(auditActor,auditFile,uid, omEntry.fileAttribues.get.size)) src ~> audit SourceShape(audit.out) } } Source.fromGraph(partialGraph) } /** * use the MatrixStoreFileSourceWithRanges to efficiently stream ranges of content * @param targetUriString omms URI of the object that we are trying to get * @return */ def streamTargetContent(targetUriString:String) = IsAuthenticatedAsync { uid=> request=> val maybeTargetUri = Try { URI.create(targetUriString) } val maybeLocator = maybeTargetUri.flatMap(targetUri => OMLocator.fromUri(targetUri)) /* look up the object, using cache if possible, and get hold of the metadata */ val objectEntryFut = Future.fromTry(maybeLocator).flatMap(locator=>{ (objectCache ? Lookup(locator)).mapTo[OCMsg].map({ case ObjectNotFound(_) => val auditFile = AuditFile("",locator.filePath) auditActor ! actors.Audit.LogEvent(AuditEvent.NOTFOUND, uid, Some(auditFile), Seq()) Left(NotFound(GenericErrorResponse("not_found", s"no object at $targetUriString").asJson)) case ObjectLookupFailed(_, err) => val auditFile = AuditFile("",locator.filePath) auditActor ! actors.Audit.LogEvent(AuditEvent.OMERROR, uid, Some(auditFile), Seq(),notes=Some(err.toString)) logger.error(s"Could not look up object for $targetUriString: ", err) Left(InternalServerError(GenericErrorResponse("server_error", s"lookup failed for $targetUriString").asJson)) case ObjectFound(_, objectEntry) => Right(objectEntry) }) }) /* break down the ranges header into structured data */ val rangesOrFailureFut = Future.fromTry(request.headers.get("Range") match { case Some(hdr)=>RangeHeader.fromStringHeader(hdr) case None=>Success(Seq()) }) /* get hold of a streaming source, if possible */ //maybeLocator.get is safe because if maybeLocator is a Failure we don't execute this block val srcOrFailureFut = getSourceFuture(maybeLocator.get, objectEntryFut, rangesOrFailureFut, uid) /* now either manifest the source to stream data to the client or output an error response to explain why we couldn't */ srcOrFailureFut.map({ case Right((byteSource, maybeResponseSize, headers, maybeMimetype, isPartialTransfer)) => logger.info(s"maybeResponseSize is $maybeResponseSize") Result( ResponseHeader(if(isPartialTransfer) 206 else 200, headers), HttpEntity.Streamed(byteSource.log("outputstream").addAttributes( Attributes.logLevels( onElement = Attributes.LogLevels.Info, onFailure = Attributes.LogLevels.Error, onFinish = Attributes.LogLevels.Info)), None, maybeMimetype) ) //we can't give a proper content length, because if we are sending multipart chunks that adds overhead to the request size. case Left(response)=>response }).recover({ case err:BadDataError=> BadRequest(err.getMessage) case err:Throwable=> logger.error(s"Could not get data for $targetUriString: ", err) InternalServerError("see the logs for more information") }) } private def getSourceFuture(locator:OMLocator, objectEntryFut:Future[Either[Result, ObjectMatrixEntry]], rangesOrFailureFut:Future[Seq[RangeHeader]], uid:String) = { Future.sequence(Seq(objectEntryFut,rangesOrFailureFut)).map(results=>{ val ranges = results(1).asInstanceOf[Seq[RangeHeader]] results.head.asInstanceOf[Either[Result,ObjectMatrixEntry]] match { case Right(omEntry)=> withUserInfo(locator) { userInfo => val responseSize = if (ranges.nonEmpty) { Some(ranges.foldLeft(0L)((acc, range) => acc + (range.end.getOrElse(omEntry.fileAttribues.get.size) - range.start.getOrElse(0L)))) } else { omEntry.fileAttribues.map(_.size) } //log that we are starting a streamout val auditFile = AuditFile(omEntry.oid, locator.filePath) auditActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT, uid, Some(auditFile), ranges) getStreamingSource(ranges, userInfo, omEntry, auditFile, uid) match { case Success(partialGraph) => Right((Source.fromGraph(partialGraph), responseSize, headersForEntry(omEntry, ranges, responseSize), getMaybeMimetype(omEntry), ranges.nonEmpty)) case Failure(err) => //if we did not get a source, log that auditActor ! actors.Audit.LogEvent(AuditEvent.OMERROR, uid, Some(auditFile), ranges, notes = Some(err.getMessage)) logger.error(s"Could not set up streaming source: ", err) Left(InternalServerError(s"Could not set up streaming source, see logs for more details")) } } case Left(err)=> Left(err) }}) } def withUserInfo[T](locator:OMLocator)(block: (UserInfo)=>Either[Result, T]) = { userInfoCache.infoForAddress(locator.host, locator.vaultId.toString).map(_.getUserInfo) match { case Some(Success(userInfo))=>block(userInfo) case None=>Left(NotFound(GenericErrorResponse("not_found", "no vault found").asJson)) case Some(Failure(err))=> logger.error(s"Could not get userInfo for $locator: ${err.getMessage}", err) Left(InternalServerError(GenericErrorResponse("internal_error", "Could not get UserInfo, see server logs for details").asJson)) } } def frontendConfig = Action { Ok(FrontendConfigResponse("ok", config.get[String]("projectlocker.baseUri"), config.getOptional[String]("pluto.baseUri") ).asJson) } }