override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of()

in app/streamcomponents/OMLookupMetadata.scala [24:85]


  override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of(in,out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private val logger = LoggerFactory.getLogger(getClass)
    private var vault:Option[Vault] = None

    setHandler(in, new AbstractInHandler {
      override def onPush(): Unit = {
        val elem=grab(in)

        def doLookup():Unit = {
          try {
            val obj = vault.get.getObject(elem.oid)

            val meta = MetadataHelper.getAttributeMetadataSync(obj)
            val updated = elem.copy(attributes = Some(meta), fileAttribues = Some(FileAttributes(MetadataHelper.getMxfsMetadata(obj))))
            push(out, updated)
          } catch {
            case err:java.io.IOException=>
              if(err.getMessage.contains("error 311")){
                logger.error(s"OMLookupMetadata got 'unable to lock object' on ${elem.oid}, retrying")
                if(ignoreOnLocked){
                  pull(in)
                } else {
                  Thread.sleep(1000)
                  doLookup()
                }
              } else {
                logger.error(s"Could not look up object metadata: ", err)
                failStage(err)
              }
            case err:TaggedIOException=>
              if(err.getError==311){
                logger.error(s"OMLookupMetadata got 'unable to lock object' on ${elem.oid}, retrying")
                Thread.sleep(1000)
                doLookup()
              } else {
                logger.error(s"Could not look up object metadata: ", err)
                failStage(err)
              }
            case err: Throwable =>
              logger.error(s"Could not look up object metadata: ", err)
              failStage(err)
          }
        }
        doLookup()
      }
    })

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = pull(in)
    })

    override def preStart(): Unit = {
      vault = Some(MatrixStore.openVault(userInfo))
    }

    override def postStop(): Unit = {
      logger.info("Stream terminated")
      vault.map(_.dispose())
    }
  }