in app/streamcomponents/OMSearchSource.scala [21:86]
override def shape: SourceShape[ObjectMatrixEntry] = SourceShape.of(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
val promise = Promise[Int]()
val logic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
var vault: Option[Vault] = None
var iterator: Option[Iterator[String]] = None
var ctr:Int=0
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
iterator match {
case None =>
logger.error(s"Can't iterate before connection was established")
failStage(new RuntimeException)
case Some(iter) =>
if (iter.hasNext) {
val oid = iter.next()
val elem = ObjectMatrixEntry(oid)
logger.debug(s"Got element $elem")
push(out, elem)
ctr+=1
} else {
logger.info(s"Completed iterating results")
complete(out)
}
}
}
})
override def preStart(): Unit = {
//establish connection to OM
try {
logger.debug("OMSearchSource starting up")
logger.info(s"Establishing connection to ${userInfo.getVault} on ${userInfo.getAddresses} as ${userInfo.getUser}")
vault = Some(MatrixStore.openVault(userInfo))
iterator = searchTerm match {
case Some(actualSearchTerm)=>
Some(vault.get.searchObjectsIterator(actualSearchTerm, atOnce).asScala)
case None=>
searchAttribute match {
case Some(actualSearchAttribute)=>Some(vault.get.searchObjectsIterator(actualSearchAttribute,atOnce).asScala)
case None=>Some(vault.get.searchObjectsIterator(new Attribute(Constants.CONTENT, "*"), atOnce).asScala)
}
}
logger.info(s"Connection established")
} catch {
case ex: Throwable =>
logger.error(s"Could not establish connection: ", ex)
failStage(ex)
}
}
override def postStop(): Unit = {
logger.info("Search stream stopped")
vault.map(_.dispose())
promise.success(ctr)
}
}
(logic, promise.future)
}