override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of()

in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/streamcomponents/OMLookupMetadata.scala [22:78]


  override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of(in,out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private val logger = LoggerFactory.getLogger(getClass)

    private var vault:Vault = null
    val completeCb = getAsyncCallback[(ObjectMatrixEntry,MxsMetadata,MXFSFileAttributes)](argTuple=>{
      val updated = argTuple._1.copy(
        attributes = Some(argTuple._2),
        fileAttribues = Some(FileAttributes(argTuple._3))
      )
      push(out, updated)
    })

    val failedCb = getAsyncCallback[Throwable](err=>failStage(err))

    setHandler(in, new AbstractInHandler {
      override def onPush(): Unit = {
        val elem=grab(in)

        try {
          val obj = vault.getObject(elem.oid)

          MetadataHelper.getAttributeMetadata(obj).onComplete({
            case Success(meta)=>
              completeCb.invoke((elem, meta, MetadataHelper.getMxfsMetadata(obj)))
            case Failure(exception)=>
              logger.error(s"Could not look up metadata: ", exception)
              failedCb.invoke(exception)
          })

        } catch {
          case err:Throwable=>
            logger.error(s"Could not look up object metadata: ", err)
            failStage(err)
        }
      }
    })

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = pull(in)
    })

    override def preStart(): Unit = {
      try {
        vault = MatrixStore.openVault(userInfo)
      } catch {
        case err:Throwable=>
          logger.error(s"Could not connect to vault ${userInfo.getVault} as ${userInfo.getUser}: ${err.getMessage}", err)
          throw err
      }
    }

    override def postStop(): Unit = {
      if(vault!=null) vault.dispose()
    }
  }