app/helpers/MetadataHelper.scala (121 lines of code) (raw):
package helpers
import java.nio.ByteBuffer
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.om.mxs.client.internal.TaggedIOException
import com.om.mxs.client.japi.MxsObject
import models.MxsMetadata
import org.apache.commons.codec.binary.Hex
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
object MetadataHelper {
private val logger = LoggerFactory.getLogger(getClass)
/**
* iterates the available metadata and presents it as a dictionary
* @param obj [[MxsObject]] entity to retrieve information from
* @param mat implicitly provided materializer for streams
* @param ec implicitly provided execution context
* @return a Future, with the relevant map
*/
def getAttributeMetadata(obj:MxsObject)(implicit mat:Materializer, ec:ExecutionContext) = {
val view = obj.getAttributeView
val sink = Sink.fold[MxsMetadata,(String,Any)](MxsMetadata.empty)((acc,elem)=>{
elem._2 match {
case boolValue: Boolean => acc.copy(boolValues = acc.boolValues ++ Map(elem._1->boolValue))
case intValue:Int => acc.copy(intValues = acc.intValues ++ Map(elem._1 -> intValue))
case longValue:Long => acc.copy(longValues = acc.longValues ++ Map(elem._1 -> longValue))
case floatValue:java.lang.Float => acc.copy(floatValues = acc.floatValues ++ Map(elem._1->Float2float(floatValue)))//acc.copy(floatValues = acc.floatValues ++ Map(elem._1 -> floatValue))
case byteBuffer:ByteBuffer => acc.copy(stringValues = acc.stringValues ++ Map(elem._1 -> Hex.encodeHexString(byteBuffer.array())))
case stringValue:String => acc.copy(stringValues = acc.stringValues ++ Map(elem._1 -> stringValue))
case _=>
logger.warn(s"Could not get metadata value for ${elem._1} on ${obj.getId}, type ${elem._2.getClass.toString} not recognised")
acc
}
})
Source.fromIterator(()=>view.iterator.asScala)
.map(elem=>(elem.getKey, elem.getValue))
.toMat(sink)(Keep.right)
.run()
}
def getAttributeMetadataSync(obj:MxsObject) = {
val view = obj.getAttributeView
view.iterator.asScala.foldLeft(MxsMetadata.empty){ (acc, elem)=>{
val v = elem.getValue.asInstanceOf[Any]
v match {
case boolValue: Boolean => acc.copy(boolValues = acc.boolValues ++ Map(elem.getKey->boolValue))
case intValue:Int => acc.copy(intValues = acc.intValues ++ Map(elem.getKey -> intValue))
case longValue:Long => acc.copy(longValues = acc.longValues ++ Map(elem.getKey -> longValue))
case floatValue:java.lang.Float => acc.copy(floatValues = acc.floatValues ++ Map(elem.getKey->Float2float(floatValue)))
case byteBuffer:ByteBuffer => acc.copy(stringValues = acc.stringValues ++ Map(elem.getKey -> Hex.encodeHexString(byteBuffer.array())))
case stringValue:String => acc.copy(stringValues = acc.stringValues ++ Map(elem.getKey -> stringValue))
case _=>
logger.warn(s"Could not get metadata value for ${elem.getKey} on ${obj.getId}, type ${elem.getValue.getClass.toString} not recognised")
acc
}
}}
}
/**
* get the MXFS file metadata
* @param obj [[MxsObject]] entity to retrieve information from
* @return
*/
def getMxfsMetadata(obj:MxsObject) = {
val view = obj.getMXFSFileAttributeView
view.readAttributes()
}
def setAttributeMetadata(obj:MxsObject, newMetadata:MxsMetadata) = {
val view = obj.getAttributeView
//meh, this is probably not very efficient
newMetadata.stringValues.foreach(entry=>view.writeString(entry._1,entry._2))
newMetadata.longValues.foreach(entry=>view.writeLong(entry._1, entry._2))
newMetadata.intValues.foreach(entry=>view.writeInt(entry._1,entry._2))
newMetadata.boolValues.foreach(entry=>view.writeBoolean(entry._1, entry._2))
}
def isNonNull(arr:Array[Byte], charAt:Int=0, maybeLength:Option[Int]=None):Boolean = {
val length = maybeLength.getOrElse(arr.length)
if(charAt>=length) return false
if(arr(charAt) != 0) {
true
} else {
isNonNull(arr,charAt+1,Some(length))
}
}
/**
* request MD5 checksum of the given object, as calculated by the appliance.
* as per the MatrixStore documentation, a blank string implies that the digest is still being calculated; in this
* case we sleep 1 second and try again.
* for this reason we do the operation in a sub-thread
* @param f MxsObject representing the object to checksum
* @return a Future, which resolves to a Try containing a String of the checksum.
*/
def getOMFileMd5(f:MxsObject, maxAttempts:Int=2):Try[String] = {
def lookup(attempt:Int=1):Try[String] = {
if(attempt>maxAttempts) return Failure(new RuntimeException(s"Could not get valid checksum after $attempt tries"))
val view = f.getAttributeView
val result = Try {
logger.debug(s"getting result for ${f.getId}...")
val buf = ByteBuffer.allocate(16)
view.read("__mxs__calc_md5", buf)
buf
}
result match {
case Failure(err:TaggedIOException)=>
if(err.getError==302){
logger.warn(s"Got 302 (server busy) from appliance, retrying after delay")
Thread.sleep(500)
lookup(attempt+1)
} else {
Failure(err)
}
case Failure(err:java.io.IOException)=>
if(err.getMessage.contains("error 302")){
logger.warn(err.getMessage)
logger.warn(s"Got an error containing 302 string, assuming server busy, retrying after delay")
Thread.sleep(500)
lookup(attempt+1)
} else {
Failure(err)
}
case Failure(otherError)=>Failure(otherError)
case Success(buffer)=>
val arr = buffer.array()
if(! isNonNull(arr)) {
logger.info(s"Empty string returned for file MD5 on attempt $attempt, assuming still calculating. Will retry...")
Thread.sleep(1000) //this feels nasty but without resorting to actors i can't think of an elegant way
//to delay and re-call in a non-blocking way
lookup(attempt + 1)
} else {
val converted = Hex.encodeHexString(arr)
if (converted.length == 32)
Success(converted)
else {
logger.warn(s"Returned checksum $converted is wrong length (${converted.length}; should be 32).")
Thread.sleep(1500)
lookup(attempt + 1)
}
}
}
}
lookup()
}
}