app/streamcomponents/OMLookupMetadata.scala (71 lines of code) (raw):
package streamcomponents
import akka.stream.scaladsl.{Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes, FlowShape, Inlet, Materializer, Outlet}
import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic}
import com.om.mxs.client.internal.TaggedIOException
import com.om.mxs.client.japi.{MXFSFileAttributes, MatrixStore, MxsObject, UserInfo, Vault}
import helpers.MetadataHelper
import models.{FileAttributes, MxsMetadata, ObjectMatrixEntry}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
/**
* look up metadata for the given objectmatrix entry
* @param userInfo UserInfo instance giving the appliance to connect to
*/
class OMLookupMetadata(userInfo:UserInfo, ignoreOnLocked:Boolean=true) extends GraphStage[FlowShape[ObjectMatrixEntry,ObjectMatrixEntry]] {
private final val in:Inlet[ObjectMatrixEntry] = Inlet.create("OMLookupMetadata.in")
private final val out:Outlet[ObjectMatrixEntry] = Outlet.create("OMLookupMetadata.out")
override def shape: FlowShape[ObjectMatrixEntry, ObjectMatrixEntry] = FlowShape.of(in,out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private var vault:Option[Vault] = None
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem=grab(in)
def doLookup():Unit = {
try {
val obj = vault.get.getObject(elem.oid)
val meta = MetadataHelper.getAttributeMetadataSync(obj)
val updated = elem.copy(attributes = Some(meta), fileAttribues = Some(FileAttributes(MetadataHelper.getMxfsMetadata(obj))))
push(out, updated)
} catch {
case err:java.io.IOException=>
if(err.getMessage.contains("error 311")){
logger.error(s"OMLookupMetadata got 'unable to lock object' on ${elem.oid}, retrying")
if(ignoreOnLocked){
pull(in)
} else {
Thread.sleep(1000)
doLookup()
}
} else {
logger.error(s"Could not look up object metadata: ", err)
failStage(err)
}
case err:TaggedIOException=>
if(err.getError==311){
logger.error(s"OMLookupMetadata got 'unable to lock object' on ${elem.oid}, retrying")
Thread.sleep(1000)
doLookup()
} else {
logger.error(s"Could not look up object metadata: ", err)
failStage(err)
}
case err: Throwable =>
logger.error(s"Could not look up object metadata: ", err)
failStage(err)
}
}
doLookup()
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
})
override def preStart(): Unit = {
vault = Some(MatrixStore.openVault(userInfo))
}
override def postStop(): Unit = {
logger.info("Stream terminated")
vault.map(_.dispose())
}
}
}