in app/controllers/VaultController.scala [177:230]
def streamTargetContent(vaultId:String, oid:String, request:Request[AnyContent], uid:String) = {
/*
break down the ranges header into structured data
*/
val rangesOrFailure = request.headers.get("Range") match {
case Some(hdr)=>RangeHeader.fromStringHeader(hdr)
case None=>Success(Seq())
}
withVaultForIdSync(vaultId) { implicit vault =>
val omEntry = ObjectMatrixEntry(oid).getMetadataSync
/*
get hold of a streaming source, if possible
*/
val maybeResult = rangesOrFailure.flatMap(ranges => {
val userInfo = userInfoCache.infoForVaultId(vaultId)
val responseSize = if (ranges.nonEmpty) {
Some(ranges.foldLeft(0L)((acc, range) => acc + (range.end.getOrElse(omEntry.fileAttribues.get.size) - range.start.getOrElse(0L))))
} else {
omEntry.fileAttribues.map(_.size)
}
logger.debug(s"Ranges is ${ranges}")
//log that we are starting a streamout
val auditFile = AuditFile(omEntry.oid, "")
auditActor ! actors.Audit.LogEvent(AuditEvent.STREAMOUT, uid, Some(auditFile), ranges)
getStreamingSource(ranges, userInfo.get, omEntry, auditFile, uid) match {
case Success(partialGraph) =>
Success((Source.fromGraph(partialGraph), responseSize, headersForEntry(omEntry, ranges, responseSize), getMaybeMimetype(omEntry), ranges.nonEmpty))
case Failure(err) => //if we did not get a source, log that
auditActor ! actors.Audit.LogEvent(AuditEvent.OMERROR, uid, Some(auditFile), ranges, notes = Some(err.getMessage))
logger.error(s"Could not set up streaming source: ", err)
Failure(new RuntimeException("Could not set up streaming source, see logs for more details"))
}
})
maybeResult match {
case Success((byteSource, maybeResponseSize, headers, maybeMimetype, isPartialTransfer)) =>
logger.debug(s"maybeResponseSize is $maybeResponseSize")
Result(
ResponseHeader(if (isPartialTransfer) 206 else 200, headers),
HttpEntity.Streamed(byteSource.log("outputstream").addAttributes(
Attributes.logLevels(
onElement = Attributes.LogLevels.Info,
onFailure = Attributes.LogLevels.Error,
onFinish = Attributes.LogLevels.Info)), None, maybeMimetype)
) //we can't give a proper content length, because if we are sending multipart chunks that adds overhead to the request size.
case Failure(err) => InternalServerError(GenericErrorResponse("error", err.getMessage).asJson)
}
}
}