app/streamcomponents/MultipartSource.scala (51 lines of code) (raw):
package streamcomponents
import akka.NotUsed
import akka.stream.SourceShape
import akka.stream.scaladsl.{Concat, GraphDSL, Source}
import akka.util.ByteString
import com.om.mxs.client.japi.UserInfo
import helpers.RangeHeader
import helpers.RandomExtender._
import models.ObjectMatrixEntry
import org.slf4j.LoggerFactory
object MultipartSource {
private val logger = LoggerFactory.getLogger(getClass)
/**
* generates a 10-character random string used as a section boundary
* @return the string
*/
def genSeparatorText: String = {
val length = 10
val r = new scala.util.Random
val a = new Array[Char](length)
for (i <- 0 until length) {
a(i) = r.nextAlphaChar
}
a.mkString
}
def sourceForRange(range:RangeHeader, userInfo:UserInfo, omEntry:ObjectMatrixEntry) = {
val graph = GraphDSL.create() { implicit builder=>
val src = builder.add(new MatrixStoreFileSourceWithRanges(userInfo, omEntry.oid, omEntry.fileAttribues.get.size, Seq(range)))
SourceShape(src.out)
}
Source.fromGraph(graph)
}
/**
* builds a sequence of (RangeHeader, Source[ByteString]) tuples out of a sequence of [[RangeHeader]] by building a [[MatrixStoreFileSourceWithRanges]] for each entry
* @param ranges sequence of RangeHeaders giving the ranges to read
* @param userInfo UserInfo object giving the ObjectMatrix appliance and vault to access
* @param omEntry [[ObjectMatrixEntry]] instance giving the file to stream
* @return a sequence of (RangeHeader, Source[ByteString]) suitable for passing to getStreamingSource
*/
def makeSources(ranges:Seq[RangeHeader], userInfo:UserInfo, omEntry:ObjectMatrixEntry) = ranges.map(range=>(range,sourceForRange(range,userInfo, omEntry)))
/**
* builds a single streaming Source for a multipart from the list of sources and ranges provided.
* @param rangeAndSource sequence of tuples, each has a RangeHeader representing the range and a Source that will grab the byte content of said range.
* @param totalSize total size of the streaming file, for building headers
* @param contentType content-type of the streaming file.
* @param separator randomised section separator text. Get this by calling `MultipartSource.genSeparatorText`.
* @return a single Source[ByteString,NotUsed] that will yield the multipart response body
*/
def getSource(rangeAndSource:Seq[(RangeHeader, Source[ByteString,NotUsed])], totalSize:Long, contentType:String, separator:String) = {
var n=0
val fullSourcesList:Seq[Source[ByteString,NotUsed]] = rangeAndSource.foldLeft[Seq[Source[ByteString, NotUsed]]](Seq())((acc,entry)=>{
n+=1
acc++Seq(
Source.single(makeSectionHeader(entry._1, totalSize, contentType, separator)),
entry._2
)
}) ++ Seq(Source.single(ByteString(s"\r\n--$separator--")))
logger.debug(s"Got sources list: $fullSourcesList")
Source.combine(fullSourcesList.head, fullSourcesList.tail.head, fullSourcesList.tail.tail:_*)(Concat(_))
}
/**
* returns a ByteString of an individual section header, consisting of the top separator, Content-Type and Content-Range headers.
* @param range [[RangeHeader]] instance giving the range that this section represents
* @param totalSize total size of the streamed file
* @param contentType content type of the streamed file
* @param separator the seperator string, generated by `genSeperatorText`. This must be the same for all parts of the response
* @return a ByteString containing the header
*/
def makeSectionHeader(range:RangeHeader, totalSize:Long, contentType:String, separator:String):ByteString = {
val absRange = range.getAbsolute(totalSize)
ByteString(s"""
|--$separator
|Content-Type: $contentType
|Content-Range: bytes ${absRange._1}-${absRange._2}/$totalSize
|
|""".stripMargin.replace("\n","\r\n"))
}
}