app/streamcomponents/ProjectSummarySink.scala (72 lines of code) (raw):
package streamcomponents
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.stream.{Attributes, Inlet, SinkShape, SourceShape}
import akka.stream.stage.{AbstractInHandler, GraphStage, GraphStageLogic, GraphStageWithMaterializedValue}
import com.om.mxs.client.japi.{SearchTerm, UserInfo}
import helpers.ContentSearchBuilder
import javax.activation.MimeType
import models.{CustomMXSMetadata, ObjectMatrixEntry, ProjectSummary}
import org.slf4j.LoggerFactory
import scala.concurrent.{Future, Promise}
import scala.util.{Success, Try}
class ProjectSummarySink extends GraphStageWithMaterializedValue[SinkShape[ObjectMatrixEntry], Future[ProjectSummary]] {
private final val in:Inlet[ObjectMatrixEntry] = Inlet.create("ProjectSummarySink.in")
override def shape: SinkShape[ObjectMatrixEntry] = SinkShape.of(in)
def makeMimeType(from:String) = Try { new MimeType(from)}.toOption
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[ProjectSummary]) = {
val summaryPromise = Promise[ProjectSummary]()
val logic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private var ongoingSummary = ProjectSummary()
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
val maybeSizeFromAttribs = elem.fileAttribues.map(_.size)
val maybeSizeFromMeta = elem.attributes.flatMap(_.longValues.get("DPSP_SIZE"))
val maybeSizeFromStringMeta = elem.attributes.flatMap(_.stringValues.get("DPSP_SIZE").map(_.toLong))
val itemSize = maybeSizeFromAttribs.getOrElse(maybeSizeFromMeta.getOrElse(maybeSizeFromStringMeta.getOrElse(0L)))
val maybeXtn = elem.attributes
.flatMap(_.stringValues.get("MXFS_FILEEXT"))
.map(xtn=>if(xtn=="") "unknown" else xtn)
val maybeMimeType = for {
attrs <- elem.attributes
typeString <- attrs.stringValues.get("MXFS_MIMETYPE")
mimeType <- makeMimeType(typeString)
} yield mimeType
elem.attributes.flatMap(CustomMXSMetadata.fromMxsMetadata) match {
case Some(customMeta)=>
ongoingSummary = customMeta.itemType.map(t=>ongoingSummary.addGnmType(t.toString,itemSize)).getOrElse(ongoingSummary)
ongoingSummary = customMeta.hidden.map(h=>ongoingSummary.addHiddenFile(h, itemSize)).getOrElse(ongoingSummary)
ongoingSummary = customMeta.projectId.map(p=>ongoingSummary.addGnmProject(p, itemSize)).getOrElse(ongoingSummary)
ongoingSummary = maybeXtn.map(x=>ongoingSummary.addFileType(x, itemSize)).getOrElse(ongoingSummary)
ongoingSummary = maybeMimeType.map(x=>ongoingSummary.addMediaType(x, itemSize)).getOrElse(ongoingSummary)
ongoingSummary = ongoingSummary.addToTotal(itemSize)
case None=>
logger.warn(s"Item ${elem.oid} has no custom metadata attributes")
}
pull(in)
}
})
override def preStart(): Unit = pull(in)
override def postStop(): Unit = summaryPromise.complete(Success(ongoingSummary))
}
(logic, summaryPromise.future)
}
}
object ProjectSummarySink {
val usefulFields = Array(
"GNM_TYPE",
"GNM_HIDDEN_FILE",
"GNM_PROJECT_ID",
"MXFS_FILEEXT",
"DPSP_SIZE",
"MXFS_MIMETYPE"
)
/**
* return an initialised OMFastcontentSearch for the query in the given builder. The relevant fields for ProjectSummarySink
* are automatically added to the query before it's stringified and passed to the Source
* @param userInfo UserInfo object pointing to the appliance and vault to use
* @return an Akka Source that yields partialilly initialised ObjectMatrixEntry instances. It is guaranteed to give good results with ProjectSummarySink
*/
def suitableFastSource(userInfo:UserInfo, queryBuilder:ContentSearchBuilder) = {
val queryString = queryBuilder.withKeywords(usefulFields).build
Source.fromGraph(GraphDSL.create() { implicit builder =>
val src = builder.add(new OMFastContentSearchSource(userInfo, queryString))
SourceShape(src.out)
})
}
}