app/streamcomponents/MakeDownloadSynopsis.scala (48 lines of code) (raw):
package streamcomponents
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic}
import models.{ArchiveEntryDownloadSynopsis, ObjectMatrixEntry}
import org.slf4j.LoggerFactory
class MakeDownloadSynopsis (maybeStripPrefixes:Option[Seq[String]]) extends GraphStage[FlowShape[ObjectMatrixEntry, ArchiveEntryDownloadSynopsis]] {
private final val in:Inlet[ObjectMatrixEntry] = Inlet.create("MakeDownloadSynopsis.in")
private final val out:Outlet[ArchiveEntryDownloadSynopsis] = Outlet.create("ArchiveEntryDownloadSynopsis")
override def shape: FlowShape[ObjectMatrixEntry, ArchiveEntryDownloadSynopsis] = FlowShape.of(in,out)
/**
* recursively iterate the provided strip prefixes and remove the first one that matches
* @param filePath filepath determined from the appliance
* @return
*/
def strippedPrefix(filePath:String) = maybeStripPrefixes match {
case None=>filePath
case Some(prefixesToRemove)=>
def stripNextPrefix(remainingList:Seq[String], onBaseString:String):String = {
if(remainingList.isEmpty){
onBaseString
} else {
val updatedString = if(onBaseString.startsWith(remainingList.head)){
return onBaseString.substring(remainingList.head.length)
} else {
onBaseString
}
stripNextPrefix(remainingList.tail, onBaseString)
}
}
stripNextPrefix(prefixesToRemove, filePath)
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass)
def determineSize(entry:ObjectMatrixEntry) = {
val maybeFileAttrSize = entry.fileAttribues.map(_.size)
val maybeMetaSize = entry.attributes.flatMap(_.longValues.get("DPSP_SIZE"))
val maybeMetaStringSize = entry.attributes.flatMap(_.stringValues.get("DPSP_SIZE").map(_.toLong))
maybeFileAttrSize.getOrElse(maybeMetaSize.getOrElse(maybeMetaStringSize.getOrElse(0L)))
}
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
val maybeFilePath = elem.attributes.flatMap(_.stringValues.get("MXFS_PATH").map(strippedPrefix))
val maybeFileSize = determineSize(elem)
val result = ArchiveEntryDownloadSynopsis(elem.oid, maybeFilePath.getOrElse(""), maybeFileSize)
push(out, result)
}
})
}
}