in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/streamcomponents/OMLookupMetadata.scala [22:78]
override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of(in,out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private var vault:Vault = null
val completeCb = getAsyncCallback[(ObjectMatrixEntry,MxsMetadata,MXFSFileAttributes)](argTuple=>{
val updated = argTuple._1.copy(
attributes = Some(argTuple._2),
fileAttribues = Some(FileAttributes(argTuple._3))
)
push(out, updated)
})
val failedCb = getAsyncCallback[Throwable](err=>failStage(err))
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem=grab(in)
try {
val obj = vault.getObject(elem.oid)
MetadataHelper.getAttributeMetadata(obj).onComplete({
case Success(meta)=>
completeCb.invoke((elem, meta, MetadataHelper.getMxfsMetadata(obj)))
case Failure(exception)=>
logger.error(s"Could not look up metadata: ", exception)
failedCb.invoke(exception)
})
} catch {
case err:Throwable=>
logger.error(s"Could not look up object metadata: ", err)
failStage(err)
}
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
})
override def preStart(): Unit = {
try {
vault = MatrixStore.openVault(userInfo)
} catch {
case err:Throwable=>
logger.error(s"Could not connect to vault ${userInfo.getVault} as ${userInfo.getUser}: ${err.getMessage}", err)
throw err
}
}
override def postStop(): Unit = {
if(vault!=null) vault.dispose()
}
}