in app/streamcomponents/ProjectSummarySink.scala [23:70]
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[ProjectSummary]) = {
val summaryPromise = Promise[ProjectSummary]()
val logic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private var ongoingSummary = ProjectSummary()
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
val maybeSizeFromAttribs = elem.fileAttribues.map(_.size)
val maybeSizeFromMeta = elem.attributes.flatMap(_.longValues.get("DPSP_SIZE"))
val maybeSizeFromStringMeta = elem.attributes.flatMap(_.stringValues.get("DPSP_SIZE").map(_.toLong))
val itemSize = maybeSizeFromAttribs.getOrElse(maybeSizeFromMeta.getOrElse(maybeSizeFromStringMeta.getOrElse(0L)))
val maybeXtn = elem.attributes
.flatMap(_.stringValues.get("MXFS_FILEEXT"))
.map(xtn=>if(xtn=="") "unknown" else xtn)
val maybeMimeType = for {
attrs <- elem.attributes
typeString <- attrs.stringValues.get("MXFS_MIMETYPE")
mimeType <- makeMimeType(typeString)
} yield mimeType
elem.attributes.flatMap(CustomMXSMetadata.fromMxsMetadata) match {
case Some(customMeta)=>
ongoingSummary = customMeta.itemType.map(t=>ongoingSummary.addGnmType(t.toString,itemSize)).getOrElse(ongoingSummary)
ongoingSummary = customMeta.hidden.map(h=>ongoingSummary.addHiddenFile(h, itemSize)).getOrElse(ongoingSummary)
ongoingSummary = customMeta.projectId.map(p=>ongoingSummary.addGnmProject(p, itemSize)).getOrElse(ongoingSummary)
ongoingSummary = maybeXtn.map(x=>ongoingSummary.addFileType(x, itemSize)).getOrElse(ongoingSummary)
ongoingSummary = maybeMimeType.map(x=>ongoingSummary.addMediaType(x, itemSize)).getOrElse(ongoingSummary)
ongoingSummary = ongoingSummary.addToTotal(itemSize)
case None=>
logger.warn(s"Item ${elem.oid} has no custom metadata attributes")
}
pull(in)
}
})
override def preStart(): Unit = pull(in)
override def postStop(): Unit = summaryPromise.complete(Success(ongoingSummary))
}
(logic, summaryPromise.future)
}