app/streamcomponents/OMSearchSource.scala (68 lines of code) (raw):
package streamcomponents
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{AbstractOutHandler, GraphStage, GraphStageLogic, GraphStageWithMaterializedValue}
import com.om.mxs.client.japi.{Attribute, Constants, MatrixStore, SearchTerm, UserInfo, Vault}
import models.ObjectMatrixEntry
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
/**
* Akka source that queries the given search term on the given ObjectMatrix cluster
* @param userInfo object matrix connection structure
* @param searchTerm term to search for
* @param atOnce number of results to pull at once
*/
class OMSearchSource (userInfo:UserInfo, searchTerm:Option[SearchTerm], searchAttribute:Option[Attribute], atOnce:Int=10) extends GraphStageWithMaterializedValue[SourceShape[ObjectMatrixEntry],Future[Int]]{
private final val out:Outlet[ObjectMatrixEntry] = Outlet.create("OMSearchSource.out")
override def shape: SourceShape[ObjectMatrixEntry] = SourceShape.of(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
val promise = Promise[Int]()
val logic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
var vault: Option[Vault] = None
var iterator: Option[Iterator[String]] = None
var ctr:Int=0
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
iterator match {
case None =>
logger.error(s"Can't iterate before connection was established")
failStage(new RuntimeException)
case Some(iter) =>
if (iter.hasNext) {
val oid = iter.next()
val elem = ObjectMatrixEntry(oid)
logger.debug(s"Got element $elem")
push(out, elem)
ctr+=1
} else {
logger.info(s"Completed iterating results")
complete(out)
}
}
}
})
override def preStart(): Unit = {
//establish connection to OM
try {
logger.debug("OMSearchSource starting up")
logger.info(s"Establishing connection to ${userInfo.getVault} on ${userInfo.getAddresses} as ${userInfo.getUser}")
vault = Some(MatrixStore.openVault(userInfo))
iterator = searchTerm match {
case Some(actualSearchTerm)=>
Some(vault.get.searchObjectsIterator(actualSearchTerm, atOnce).asScala)
case None=>
searchAttribute match {
case Some(actualSearchAttribute)=>Some(vault.get.searchObjectsIterator(actualSearchAttribute,atOnce).asScala)
case None=>Some(vault.get.searchObjectsIterator(new Attribute(Constants.CONTENT, "*"), atOnce).asScala)
}
}
logger.info(s"Connection established")
} catch {
case ex: Throwable =>
logger.error(s"Could not establish connection: ", ex)
failStage(ex)
}
}
override def postStop(): Unit = {
logger.info("Search stream stopped")
vault.map(_.dispose())
promise.success(ctr)
}
}
(logic, promise.future)
}
}