app/mxscopy/streamcomponents/OMFastContentSearchSource.scala (64 lines of code) (raw):
package mxscopy.streamcomponents
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{AbstractOutHandler, GraphStage, GraphStageLogic}
import com.om.mxs.client.japi.{Attribute, Constants, MatrixStore, SearchTerm, UserInfo, Vault}
import mxscopy.models.{MxsMetadata, ObjectMatrixEntry}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.util.{Failure, Success}
class OMFastContentSearchSource(vault:Vault, contentSearchString:String, keywords:Array[String], atOnce:Int=100) extends GraphStage[SourceShape[ObjectMatrixEntry]] {
private final val out:Outlet[ObjectMatrixEntry] = Outlet.create("OMFastSearchSource.out")
override def shape: SourceShape[ObjectMatrixEntry] = SourceShape.of(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass)
def parseOutResults(resultString:String) = {
logger.debug(s"parseOutResults: got $resultString")
val parts = resultString.split("\n")
val kvs = parts.tail
.map(_.split("="))
.foldLeft(Map[String,String]()) ((acc,elem)=>acc ++ Map(elem.head -> elem.tail.mkString("=")))
logger.debug(s"got $kvs")
val mxsMeta = MxsMetadata(kvs,Map(),Map(),Map())
logger.debug(s"got $mxsMeta")
ObjectMatrixEntry(parts.head, attributes = Some(mxsMeta), fileAttribues = None)
}
var iterator: Option[Iterator[String]] = None
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
iterator match {
case None =>
logger.error(s"Can't iterate before connection was established")
failStage(new RuntimeException)
case Some(iter) =>
if (iter.hasNext) {
val resultString = iter.next()
val elem = parseOutResults(resultString)
logger.debug(s"Got element $elem")
push(out, elem)
} else {
logger.info(s"Completed iterating results")
complete(out)
}
}
}
})
override def preStart(): Unit = {
//establish connection to OM
try {
logger.debug("OMFastSearchSource starting up")
logger.info(s"Establishing connection to ${vault.getId}")
val searchTermString = if(keywords.nonEmpty) {
//for some reason, we always lose whatever keyword we put on the start. So add in a dummy one.
contentSearchString + "\n" + "keywords: " + (Seq("__mxs__id") ++ keywords).mkString(",")
} else {
contentSearchString
}
logger.debug(s"search string is '$searchTermString'")
iterator = Some(vault.searchObjectsIterator(SearchTerm.createSimpleTerm(Constants.CONTENT, searchTermString), atOnce).asScala)
logger.info("Connection established")
} catch {
case ex: Throwable =>
logger.error(s"Could not establish connection: ", ex)
failStage(ex)
}
}
}
}