in app/streamcomponents/OMLookupMetadata.scala [24:85]
override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of(in,out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private var vault:Option[Vault] = None
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem=grab(in)
def doLookup():Unit = {
try {
val obj = vault.get.getObject(elem.oid)
val meta = MetadataHelper.getAttributeMetadataSync(obj)
val updated = elem.copy(attributes = Some(meta), fileAttribues = Some(FileAttributes(MetadataHelper.getMxfsMetadata(obj))))
push(out, updated)
} catch {
case err:java.io.IOException=>
if(err.getMessage.contains("error 311")){
logger.error(s"OMLookupMetadata got 'unable to lock object' on ${elem.oid}, retrying")
if(ignoreOnLocked){
pull(in)
} else {
Thread.sleep(1000)
doLookup()
}
} else {
logger.error(s"Could not look up object metadata: ", err)
failStage(err)
}
case err:TaggedIOException=>
if(err.getError==311){
logger.error(s"OMLookupMetadata got 'unable to lock object' on ${elem.oid}, retrying")
Thread.sleep(1000)
doLookup()
} else {
logger.error(s"Could not look up object metadata: ", err)
failStage(err)
}
case err: Throwable =>
logger.error(s"Could not look up object metadata: ", err)
failStage(err)
}
}
doLookup()
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
})
override def preStart(): Unit = {
vault = Some(MatrixStore.openVault(userInfo))
}
override def postStop(): Unit = {
logger.info("Stream terminated")
vault.map(_.dispose())
}
}