app/drivers/MatrixStoreDriver.scala (216 lines of code) (raw):
package drivers
import java.io.{InputStream, OutputStream}
import java.time.ZonedDateTime
import akka.stream.Materializer
import com.om.mxs.client.japi.{Attribute, Constants, MxsObject, SearchTerm, Vault}
import drivers.objectmatrix.{MXSConnectionBuilder, MxsMetadata, ObjectMatrixEntry}
import helpers.StorageHelper
import models.StorageEntry
import org.slf4j.LoggerFactory
import play.api.inject.Injector
import java.nio.file.Paths
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success, Try}
import scala.jdk.CollectionConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
/**
* @param storageRef [[StorageEntry]] instance that this driver instance is assocaited with
* @param mat implicitly provided ActorMaterializer
*/
class MatrixStoreDriver(override val storageRef: StorageEntry)(implicit injector:Injector) extends StorageDriver {
private val logger = LoggerFactory.getLogger(getClass)
private val connectionManager = injector.instanceOf(classOf[MXSConnectionManager])
/**
* wrapper to perform an operation with a vault pointer and ensure that it is disposed when completed
* @param blk block to perform operation. This is passed a Vault pointer, and can return any type wrapped in a Try. The wrapper returns the
* value of the block wrapped in a Try indicating whether the operation succeeded or failed; the vault is disposed
* either way
* @tparam A type of return value of the block
* @return the value of the block if successful, or a failure indicating why a connection could not be established
*/
def withVault[A](blk:Vault=>Try[A]):Try[A] = {
(storageRef.host, storageRef.device, storageRef.user, storageRef.password) match {
case (Some(h),Some(d),Some(u),Some(p))=>
val mxs = connectionManager.getConnection(h,u,p)
mxs.flatMap(mxs=>MXSConnectionBuilder.withVault(mxs, d)(blk))
case _=>
logger.error(s"Storage ${storageRef.id} is misconfigured and is missing at least one of host(s), device, username or password")
Failure(new RuntimeException(s"Storage ${storageRef.id} is misconfigured"))
}
}
def withObject[A](vault:Vault,oid:String)(blk:MxsObject=>Try[A]):Try[A] = {
for {
mxsObj <- Try { vault.getObject(oid) }
result <- blk(mxsObj)
} yield result
}
/**
* Directly write an InputStream to the given path, until EOF (blocking)
* @param path [[String]] absolute path to write
* @param dataStream [[java.io.FileInputStream]] to write from
*/
def writeDataToPath(path:String, version:Int, dataStream:InputStream):Try[Unit] = withVault { vault=>
def getStream() = {
lookupPath(vault, path, version) match {
case None =>
logger.debug(s"Object for $path $version does not exist, creating new...")
val fileMeta = newFileMeta(path, version, None)
val mxsFile = vault.createObject(fileMeta.toAttributes.toArray)
mxsFile.newOutputStream()
case Some(oid) =>
logger.debug(s"Object for $path $version already exists at $oid")
val mxsFile = vault.getObject(oid)
mxsFile.newOutputStream()
}
}
val stream = getStream()
val result = for {
copiedSize <- Try { StorageHelper.copyStream(dataStream, stream) }
} yield copiedSize
stream.close()
result.map(copiedSize=>logger.info(s"Copied $path: $copiedSize bytes"))
}
/**
* returns the file extension of the provided filename, or None if there is no extension
* @param fileNameString filename string
* @return the content of the last extension
*/
def getFileExt(fileNameString:String):Option[String] = {
val re = ".*\\.([^\\.]+)$".r
fileNameString match {
case re(xtn) =>
if (xtn.length < 8) {
Some(xtn)
} else {
logger.warn(s"$xtn does not look like a file extension (too long), assuming no actual extension")
None
}
case _ => None
}
}
def newFileMeta(pathString:String, version:Int, length:Option[Long]) = {
val currentTime = ZonedDateTime.now()
val path = Paths.get(pathString)
val initialMeta = MxsMetadata(
stringValues = Map(
"MXFS_FILENAME_UPPER" -> path.toString.toUpperCase,
"MXFS_FILENAME"->path.getFileName.toString,
"MXFS_PATH"->path.toString,
"MXFS_MIMETYPE"->"application/octet-stream",
"MXFS_DESCRIPTION"->s"Projectlocker project $path",
"MXFS_PARENTOID"->"",
"MXFS_FILEEXT"->getFileExt(path.getFileName.toString).getOrElse("")
),
boolValues = Map(
"MXFS_INTRASH"->false,
),
longValues = Map(
"MXFS_MODIFICATION_TIME"->currentTime.toInstant.toEpochMilli,
"MXFS_ACCESS_TIME"->currentTime.toInstant.toEpochMilli,
),
intValues = Map(
"MXFS_COMPATIBLE"->1,
"MXFS_CATEGORY"->4, //set type to "document",
"PROJECTLOCKER_VERSION"->version
)
)
length match {
case None=>initialMeta
case Some(actualLength)=>initialMeta.copy(longValues = initialMeta.longValues + ("DPSP_SIZE"->actualLength))
}
}
/**
* Directly write a byte array to the given path (blocking)
* @param path [[String]] absolute path to write
* @param data [[Array]] (of bytes) - byte array to output
* @return a Try indicating success or failure. If successful the Try has a unit value.
*/
def writeDataToPath(path:String, version:Int, data:Array[Byte]):Try[Unit] = withVault { vault=>
val mxsFile = lookupPath(vault, path, version) match {
case None=>
logger.debug(s"No path found for $path at version $version on ${vault.getId}")
val fileMeta = newFileMeta(path, version, Some(data.length))
vault.createObject(fileMeta.toAttributes.toArray)
case Some(oid)=>
logger.debug(s"Found entry $oid for path $path at version $version on ${vault.getId}")
vault.getObject(oid)
}
val stream = mxsFile.newOutputStream()
try {
stream.write(data)
Success( () )
} finally {
stream.close()
}
}
/**
* Delete the file at the given path (blocking)
* @param path [[String]] absolute path to delete
* @return [[Boolean]] indicating whether the file was deleted or not.
*/
def deleteFileAtPath(path:String, version:Int):Boolean = withVault { vault=>
lookupPath(vault, path, version) match {
case None =>
logger.error(s"No file to delete at $path with version $version on ${storageRef.repr}")
Success(false)
case Some(oid) =>
withObject(vault, oid) { mxsObject =>
logger.info(s"Deleting MXS file $oid (path $path, version $version)...")
mxsObject.delete()
Success(true)
}
}
} match {
case Success(result) => result
case Failure(err) =>
logger.error(s"Could not delete file at $path on $storageRef: ", err)
false
}
/**
* Get a relevant type of InputStream to read a file's data
* @param path [[String]] Absolute path to open
* @return [[java.io.InputStream]] subclass wrapped in a [[Try]]
*/
def getReadStream(path:String, version:Int):Try[InputStream] = withVault { vault=>
lookupPath(vault, path, version) match {
case None=>
Failure(new RuntimeException(s"File $path does not exist"))
case Some(oid)=>
withObject(vault, oid) { mxsObject=>
Try { mxsObject.newInputStream() }
}
}
}
/**
* Get a relevant type of OutputStream to write a file's data. this may truncate the file.
* @param path [[String]] Absolute path to open
* @return [[java.io.OutputStream]] subclass wrapped in a [[Try]]
*/
def getWriteStream(path:String, version:Int):Try[OutputStream] = withVault { vault=>
logger.info(s"Writing to file at path $path with version $version on ${storageRef.repr}")
lookupPath(vault, path, version) match {
case None=>
logger.debug(s"No path found for $path at version $version on ${vault.getId}")
val fileMeta = newFileMeta(path, version, None)
Try {
vault.createObject(fileMeta.toAttributes.toArray)
}.flatMap(obj=>Try { obj.newOutputStream() })
case Some(oid)=>
withObject(vault, oid) { mxsObject=>
Try { mxsObject.newOutputStream() }
}
}
}
/**
* Get a Map of metadata relevant to the specified file. The contents can vary between implementations, but should always
* have Symbol("size") (Long converted to String) and Symbol("lastModified") (Long converted to String) members
* @param path String Absolute path to open
* @return either a [[MatrixStoreMetadata]] object with the requested information or None if the file does not exist.
*/
def getMetadata(path:String, version:Int):Option[MatrixStoreMetadata] = withVault { vault=>
lookupPath(vault, path, version).map(oid=>Try {
val mxsObj = vault.getObject(oid)
val attrView = mxsObj.getAttributeView
val fileAttrs = mxsObj.getMXFSFileAttributeView.readAttributes()
MatrixStoreMetadata(fileAttrs.size(), fileAttrs.lastModifiedTime(), attrView.readInt("PROJECTLOCKER_VERSION"), oid)
}).getOrElse(Failure(new RuntimeException(s"File $path at version $version does not exist on this storage")))
} match {
case Success(map)=>Some(map)
case Failure(err)=>
logger.error(s"Could not get metadata for $path at version $version: ", err)
None
}
/**
* returns ALL OIDs matching a given filename, i.e. if they have different version numbers
* @param vault vault to query
* @param fileName filename to look for
* @return
*/
def versionsForFile(vault:Vault, fileName:String) = {
logger.debug(s"Lookup $fileName on ${vault.getId}")
val searchTerm = SearchTerm.createSimpleTerm("MXFS_FILENAME", fileName)
vault.searchObjects(searchTerm, 1).asScala.toSeq
}
/**
* look up a given (unique) path on the storage that is not in the trash.
* @param vault Vault reference to perform search on
* @param path Path to look for (in MXFS_PATH)
* @param version version number to look for (in PROJECTLOCKER_VERSION)
* @return either the OID of the matching file or None.
*/
def lookupPath(vault:Vault, path:String, version:Int) = {
logger.debug(s"Lookup $path at version $version on OM vault ${vault.getId}")
val searchTerm = SearchTerm.createSimpleTerm(Constants.CONTENT, s"""MXFS_PATH:\"$path\" AND PROJECTLOCKER_VERSION:$version AND MXFS_INTRASH:0""")
//val searchTerm = SearchTerm.createSimpleTerm("PROJECTLOCKER_VERSION", version)
val results = vault.searchObjects(searchTerm, 1).asScala.toSeq
results.headOption match {
case Some(entry)=>
logger.debug(s"Got $entry as the OID for $path at version $version")
case None=>
logger.info(s"Could not find anything for $path at version $version")
val allVersionsQuery = SearchTerm.createSimpleTerm(Constants.CONTENT, s"""MXFS_PATH:\"$path\" AND MXFS_INTRASH:0""")
val allVersions = vault.searchObjects(allVersionsQuery, 10).asScala.toSeq
logger.info(s"Found ${allVersions.length} hits for path $path")
allVersions.foreach(oid=>logger.info(s"\t$path: $oid"))
}
results.headOption
}
/**
* Does the given path exist on this storage?
* @param path
* @return
*/
def pathExists(path:String, version:Int):Boolean =
withVault { vault=>
logger.debug(s"Lookup $path on ${vault.getId}")
if(path==""){
Success(true) //checking if blank path exists means check if the vault exists. If we get here, then it should do.
} else {
Success(lookupPath(vault, path, version) match {
case Some(oid)=>
logger.info(s"Found $oid for $path at version $version")
true
case None=>
logger.info(s"Found nothing for $path at version $version")
false
})
}
} match {
case Success(result)=>result
case Failure(err)=>throw err
}
def supportsVersions: Boolean = true
}