app/streamcomponents/MultipartSource.scala (51 lines of code) (raw):

package streamcomponents import akka.NotUsed import akka.stream.SourceShape import akka.stream.scaladsl.{Concat, GraphDSL, Source} import akka.util.ByteString import com.om.mxs.client.japi.UserInfo import helpers.RangeHeader import helpers.RandomExtender._ import models.ObjectMatrixEntry import org.slf4j.LoggerFactory object MultipartSource { private val logger = LoggerFactory.getLogger(getClass) /** * generates a 10-character random string used as a section boundary * @return the string */ def genSeparatorText: String = { val length = 10 val r = new scala.util.Random val a = new Array[Char](length) for (i <- 0 until length) { a(i) = r.nextAlphaChar } a.mkString } def sourceForRange(range:RangeHeader, userInfo:UserInfo, omEntry:ObjectMatrixEntry) = { val graph = GraphDSL.create() { implicit builder=> val src = builder.add(new MatrixStoreFileSourceWithRanges(userInfo, omEntry.oid, omEntry.fileAttribues.get.size, Seq(range))) SourceShape(src.out) } Source.fromGraph(graph) } /** * builds a sequence of (RangeHeader, Source[ByteString]) tuples out of a sequence of [[RangeHeader]] by building a [[MatrixStoreFileSourceWithRanges]] for each entry * @param ranges sequence of RangeHeaders giving the ranges to read * @param userInfo UserInfo object giving the ObjectMatrix appliance and vault to access * @param omEntry [[ObjectMatrixEntry]] instance giving the file to stream * @return a sequence of (RangeHeader, Source[ByteString]) suitable for passing to getStreamingSource */ def makeSources(ranges:Seq[RangeHeader], userInfo:UserInfo, omEntry:ObjectMatrixEntry) = ranges.map(range=>(range,sourceForRange(range,userInfo, omEntry))) /** * builds a single streaming Source for a multipart from the list of sources and ranges provided. * @param rangeAndSource sequence of tuples, each has a RangeHeader representing the range and a Source that will grab the byte content of said range. * @param totalSize total size of the streaming file, for building headers * @param contentType content-type of the streaming file. * @param separator randomised section separator text. Get this by calling `MultipartSource.genSeparatorText`. * @return a single Source[ByteString,NotUsed] that will yield the multipart response body */ def getSource(rangeAndSource:Seq[(RangeHeader, Source[ByteString,NotUsed])], totalSize:Long, contentType:String, separator:String) = { var n=0 val fullSourcesList:Seq[Source[ByteString,NotUsed]] = rangeAndSource.foldLeft[Seq[Source[ByteString, NotUsed]]](Seq())((acc,entry)=>{ n+=1 acc++Seq( Source.single(makeSectionHeader(entry._1, totalSize, contentType, separator)), entry._2 ) }) ++ Seq(Source.single(ByteString(s"\r\n--$separator--"))) logger.debug(s"Got sources list: $fullSourcesList") Source.combine(fullSourcesList.head, fullSourcesList.tail.head, fullSourcesList.tail.tail:_*)(Concat(_)) } /** * returns a ByteString of an individual section header, consisting of the top separator, Content-Type and Content-Range headers. * @param range [[RangeHeader]] instance giving the range that this section represents * @param totalSize total size of the streamed file * @param contentType content type of the streamed file * @param separator the seperator string, generated by `genSeperatorText`. This must be the same for all parts of the response * @return a ByteString containing the header */ def makeSectionHeader(range:RangeHeader, totalSize:Long, contentType:String, separator:String):ByteString = { val absRange = range.getAbsolute(totalSize) ByteString(s""" |--$separator |Content-Type: $contentType |Content-Range: bytes ${absRange._1}-${absRange._2}/$totalSize | |""".stripMargin.replace("\n","\r\n")) } }