app/streamcomponents/OMFastSearchSourceBase.scala (66 lines of code) (raw):

package streamcomponents import akka.stream.{Attributes, Outlet, SourceShape} import akka.stream.stage.{AbstractOutHandler, GraphStage, GraphStageLogic} import com.om.mxs.client.japi.{Attribute, Constants, MatrixStore, SearchTerm, UserInfo, Vault} import models.{MxsMetadata, ObjectMatrixEntry} import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ import scala.util.{Failure, Success} abstract class OMFastSearchSourceBase(userInfo:UserInfo, atOnce:Int=10) extends GraphStage[SourceShape[ObjectMatrixEntry]] { private final val out:Outlet[ObjectMatrixEntry] = Outlet.create("OMFastSearchSource.out") /** * this must be provided by a child class. It should return a single SearchTerm instance, made from combining * any required search terms. This is passed to the MatrixStore library for searching * @return a SearchTerm instance */ def getSearchTerms:SearchTerm override def shape: SourceShape[ObjectMatrixEntry] = SourceShape.of(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private val logger:org.slf4j.Logger = LoggerFactory.getLogger(getClass) def parseOutResults(resultString:String) = { val parts = resultString.split("\n") logger.debug(s"parseOutResults: got $parts") val kvs = parts.tail .map(_.split("=")) .foldLeft(Map[String,String]()) ((acc,elem)=>acc ++ Map(elem.head -> elem.tail.mkString("="))) logger.debug(s"got $kvs") val mxsMeta = MxsMetadata(kvs,Map(),Map(),Map(),Map()) logger.debug(s"got $mxsMeta") ObjectMatrixEntry(parts.head,attributes = Some(mxsMeta), fileAttribues = None) } var vault: Option[Vault] = None var iterator: Option[Iterator[String]] = None setHandler(out, new AbstractOutHandler { override def onPull(): Unit = { iterator match { case None => logger.error(s"Can't iterate before connection was established") failStage(new RuntimeException) case Some(iter) => if (iter.hasNext) { val resultString = iter.next() val elem = parseOutResults(resultString) logger.debug(s"Got element $elem") push(out, elem) } else { logger.info(s"Completed iterating results") complete(out) } } } }) override def preStart(): Unit = { //establish connection to OM try { logger.debug("OMFastSearchSource starting up") logger.info(s"Establishing connection to ${userInfo.getVault} on ${userInfo.getAddresses} as ${userInfo.getUser}") vault = Some(MatrixStore.openVault(userInfo)) val finalTerm = getSearchTerms iterator = vault.map(_.searchObjectsIterator(finalTerm, atOnce).asScala) logger.info("Connection established") } catch { case ex: Throwable => logger.error(s"Could not establish connection: ", ex) failStage(ex) } } override def postStop(): Unit = { logger.info("Search stream stopped") vault.map(_.dispose()) } } }