override def shape: SourceShape[ObjectMatrixEntry] = SourceShape.of()

in app/streamcomponents/OMSearchSource.scala [21:86]


  override def shape: SourceShape[ObjectMatrixEntry] = SourceShape.of(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
    val promise = Promise[Int]()

    val logic = new GraphStageLogic(shape) {
      private val logger = LoggerFactory.getLogger(getClass)
      var vault: Option[Vault] = None
      var iterator: Option[Iterator[String]] = None
      var ctr:Int=0

      setHandler(out, new AbstractOutHandler {
        override def onPull(): Unit = {
          iterator match {
            case None =>
              logger.error(s"Can't iterate before connection was established")
              failStage(new RuntimeException)
            case Some(iter) =>
              if (iter.hasNext) {
                val oid = iter.next()
                val elem = ObjectMatrixEntry(oid)
                logger.debug(s"Got element $elem")
                push(out, elem)
                ctr+=1
              } else {
                logger.info(s"Completed iterating results")
                complete(out)
              }
          }
        }

      })

      override def preStart(): Unit = {
        //establish connection to OM
        try {
          logger.debug("OMSearchSource starting up")
          logger.info(s"Establishing connection to ${userInfo.getVault} on ${userInfo.getAddresses} as ${userInfo.getUser}")
          vault = Some(MatrixStore.openVault(userInfo))
          iterator = searchTerm match {
            case Some(actualSearchTerm)=>
              Some(vault.get.searchObjectsIterator(actualSearchTerm, atOnce).asScala)
            case None=>
              searchAttribute match {
                case Some(actualSearchAttribute)=>Some(vault.get.searchObjectsIterator(actualSearchAttribute,atOnce).asScala)
                case None=>Some(vault.get.searchObjectsIterator(new Attribute(Constants.CONTENT, "*"), atOnce).asScala)
              }
          }

          logger.info(s"Connection established")
        } catch {
          case ex: Throwable =>
            logger.error(s"Could not establish connection: ", ex)
            failStage(ex)
        }
      }

      override def postStop(): Unit = {
        logger.info("Search stream stopped")
        vault.map(_.dispose())
        promise.success(ctr)
      }
    }

    (logic, promise.future)
  }