app/models/FileEntryDAO.scala (276 lines of code) (raw):
package models
import akka.stream.scaladsl.Source
import drivers.StorageDriver
import org.slf4j.LoggerFactory
import play.api.Logger
import play.api.db.slick.DatabaseConfigProvider
import play.api.inject.Injector
import play.api.mvc.RawBuffer
import slick.jdbc.PostgresProfile
import slick.jdbc.PostgresProfile.api._
import slick.lifted.TableQuery
import java.io.{File, FileInputStream, InputStream}
import java.nio.file.{Files, Path, Paths}
import javax.inject.{Inject, Singleton}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
@Singleton
class FileEntryDAO @Inject() (dbConfigProvider:DatabaseConfigProvider)(implicit ec:ExecutionContext, injector:Injector) {
private final val db = dbConfigProvider.get[PostgresProfile].db
private final val logger = LoggerFactory.getLogger(getClass)
/**
* writes this model into the database, inserting if id is None and returning a fresh object with id set. If an id
* was set, then returns the same object. */
def save(entry:FileEntry):Future[Try[FileEntry]] = entry.id match {
case None=>
val insertQuery = TableQuery[FileEntryRow] returning TableQuery[FileEntryRow].map(_.id) into ((item,id)=>item.copy(id=Some(id)))
db.run(
(insertQuery+=entry).asTry
).map({
case Success(insertResult)=>Success(insertResult)
case Failure(error)=>Failure(error)
})
case Some(realEntityId)=>
db.run(
TableQuery[FileEntryRow].filter(_.id===realEntityId).update(entry).asTry
).map({
case Success(_)=>Success(entry)
case Failure(error)=>Failure(error)
})
}
def saveSimple(entry:FileEntry):Future[FileEntry] = save(entry).flatMap({
case Success(e)=>Future(e)
case Failure(err)=>Future.failed(err)
})
/**
* returns a StorageEntry object for the id of the storage of this FileEntry */
def storage(entry:FileEntry):Future[Option[StorageEntry]] = {
db.run(
TableQuery[StorageEntryRow].filter(_.id===entry.storageId).result
).map(_.headOption)
}
/**
* Get a full path of the file, including the root path of the storage
* @param entry FileEntry to query
* @return Future containing a string
*/
def getFullPath(entry:FileEntry):Future[String] =
storage(entry).map({
case Some(storage)=>
Paths.get(storage.rootpath.getOrElse(""), entry.filepath).toString
case None=>
entry.filepath
})
/**
* Gets a java.io.File pointing to the given file
* @param entry FileEntry to query
* @return
*/
def getJavaPath(entry:FileEntry):Future[Path] = storage(entry)
.map({
case Some(storage) =>
if(storage.storageType=="Local") {
val p = Paths.get(storage.rootpath.getOrElse(""), entry.filepath)
if (Files.exists(p)) {
p
} else {
throw new RuntimeException(s"${p.toString} does not exist")
}
} else {
logger.error("Cannot getJavaFile for a project that is on a non-local storage")
throw new RuntimeException(s"${entry.filepath} with id ${entry.id} is on storage ${storage.id} which is of non-local type ${storage.storageType}")
}
case None =>
val f = new File(entry.filepath)
if(f.exists()) {
f.toPath
} else {
throw new RuntimeException(s"${f.toString} does not exist")
}
})
def getJavaFile(entry:FileEntry):Future[File] = getJavaPath(entry).map(_.toFile)
/**
* this attempts to delete the file from disk, using the configured storage driver
* @param entry FileEntry to delete
* @return A future containing either a Right() containing a Boolean indicating whether the delete happened, or a Left with an error string
*/
def deleteFromDisk(entry:FileEntry):Future[Either[String,Boolean]] = {
val maybeStorageDriverFuture = storage(entry).map({
case Some(storageRef)=>
storageRef.getStorageDriver
case None=>
None
})
maybeStorageDriverFuture.flatMap({
case Some(storagedriver)=>
getFullPath(entry).map(fullpath=>Right(storagedriver.deleteFileAtPath(fullpath, entry.version)))
case None=>
Future(Left("No storage driver configured for storage"))
})
}
/**
* attempt to delete the underlying record from the database
* @param entry FileEntry to delete
* @return a Future with no value on success. On failure, the future fails; pick this up with .recover() or .onComplete
*/
def deleteRecord(entry:FileEntry):Future[Unit] =
entry.id match {
case Some(databaseId)=>
logger.info(s"Deleting database record for file $databaseId (${entry.filepath} on storage ${entry.storageId})")
db.run(
DBIO.seq(
TableQuery[FileAssociationRow].filter(_.fileEntry===databaseId).delete,
TableQuery[FileEntryRow].filter(_.id===databaseId).delete
)
)
case None=>
Future.failed(new RuntimeException("Cannot delete a record that has not been saved to the database"))
}
/**
* private method to (synchronously) write a buffer of content to the underlying file. Called by the public method writeToFile().
* @param buffer [[play.api.mvc.RawBuffer]] containing content to write
* @param outputPath String, absolute path to write content to.
* @param storageDriver [[StorageDriver]] instance to do the actual writing
* @return a Try containing the unit value
*/
private def writeContent(entry:FileEntry, buffer: RawBuffer, outputPath:java.nio.file.Path, storageDriver:StorageDriver):Try[Unit] =
buffer.asBytes() match {
case Some(bytes) => //the buffer is held in memory
val logger = Logger(getClass)
logger.debug("uploadContent: writing memory buffer")
storageDriver.writeDataToPath(outputPath.toString, entry.version, bytes.toArray)
case None => //the buffer is on-disk
val logger = Logger(getClass)
logger.debug("uploadContent: writing disk buffer")
val fileInputStream = new FileInputStream(buffer.asFile)
val result=storageDriver.writeDataToPath(outputPath.toString, entry.version, fileInputStream)
fileInputStream.close()
result
}
/**
* Update the hasContent flag
* @param entry FileEntry to update
* @return a Future containing a Try, which contains an updated [[models.FileEntry]] instance
*/
def updateFileHasContent(entry:FileEntry) = entry.id match {
case Some(recordId)=>
val updateFileref = entry.copy(hasContent = true)
db.run (
TableQuery[FileEntryRow].filter (_.id === recordId).update (updateFileref).asTry
)
case None=>
Future(Failure(new RuntimeException("Can't update a file record that has not been saved")))
}
def writeStreamToFile(entry: FileEntry, inputStream: InputStream): Future[Unit] = {
logger.debug(s"writeStreamToFile called for entry: ${entry.id}")
storage(entry).flatMap {
case Some(storage) =>
storage.getStorageDriver match {
case Some(storageDriver) =>
val outputPath = Paths.get(entry.filepath)
logger.info(s"Preparing to write to $outputPath with storage driver $storageDriver")
Future {
try {
logger.debug("Initiating file write process using storageDriver...")
// Using storageDriver to write data from inputStream to the path
val result = storageDriver.writeDataToPath(outputPath.toString, entry.version, inputStream)
inputStream.close() // Close the stream after writing
logger.debug("File write process completed. Updating file content status...")
updateFileHasContent(entry)
logger.info(s"File successfully written to $outputPath and content status updated.")
result
} catch {
case e: Exception =>
logger.error(s"Error occurred during file writing to $outputPath", e)
try {
inputStream.close()
} catch {
case closeError: Exception =>
logger.error("Error occurred while closing input stream", closeError)
}
throw e
}
}
case None =>
val errorMsg = s"No storage driver available for storage ${entry.storageId}"
logger.error(errorMsg)
Future.failed(new RuntimeException(errorMsg))
}
case None =>
val errorMsg = s"No storage could be found for ID ${entry.storageId}"
logger.error(errorMsg)
Future.failed(new RuntimeException(errorMsg))
}
}
/* Asynchronously writes the given buffer to this file*/
def writeToFile(entry:FileEntry, buffer: RawBuffer):Future[Unit] = {
storage(entry).map({
case Some(storage) =>
storage.getStorageDriver match {
case Some(storageDriver) =>
val outputPath = Paths.get(entry.filepath)
logger.info(s"Writing to $outputPath with $storageDriver")
for {
response <- Future.fromTry(writeContent(entry, buffer, outputPath, storageDriver))
_ <- updateFileHasContent(entry)
} yield response
case None =>
logger.error(s"No storage driver available for storage ${entry.storageId}")
Failure(new RuntimeException(s"No storage driver available for storage ${entry.storageId}"))
}
case None =>
logger.error(s"No storage could be found for ID ${entry.storageId}")
Failure(new RuntimeException(s"No storage could be found for ID ${entry.storageId}"))
})
}
/**
* check if this FileEntry points to something real on disk
* @param entry FileEntry to query
* @return a Future, containing a Left with a string if there was an error, or a Right with a Boolean flag indicating if the
* pointed object exists on the storage
*/
def validatePathExists(entry:FileEntry) =
for {
filePath <- getFullPath(entry)
maybeStorage <- storage(entry)
result <- Future(
maybeStorage
.map(_.validatePathExists(filePath, entry.version)) match {
case Some(result)=>result
case None=>Left(s"No storage could be found for ID ${entry.storageId} on file ${entry.id}")
}
)
} yield result
/**
* check if this FileEntry points to something real on disk.
* intended to be used in streaming/looping contexts, this expects a StorageDriver for the relevant storage
* to be provided externally rather than provisioning one internally
*
* @param db
* @param driver
* @return
*/
def validatePathExistsDirect(entry:FileEntry)(implicit driver:StorageDriver) = {
getFullPath(entry).map(path=>driver.pathExists(path, entry.version))
}
private def makeQuery(forId:Int, forStorage:Option[Int]) = {
val baseQuery = TableQuery[FileEntryRow].filter(_.backupOf===forId)
forStorage match {
case Some(storageId) => baseQuery.filter(_.storage===storageId)
case None=> baseQuery
}
}
/**
* returns some of the backups for this file. Results are sorted by most recent version first.
*
* If the storage does not support versioning you would expect only one result.
*
* @param drop start iterating at this entry
* @param take only return this many results max
* @return a Future containing a sequence of FileEntry objects. This fails if there is a problem.
*/
def backups(entry:FileEntry, forStorage:Option[Int]=None, drop:Int=0, take:Int=100) = entry.id match {
case None=>
Future.failed(new RuntimeException("A record must be saved before you can query for backups"))
case Some(fileId)=>
logger.info(s"Looking for backups of file with id $fileId on storage $forStorage")
db.run {
makeQuery(fileId, forStorage)
.sortBy(_.version.desc.nullsLast)
.drop(drop)
.take(take)
.result
}
}
def backupsCount(entry:FileEntry, forStorage:Option[Int]=None) = entry.id match {
case None=>
Future.failed(new RuntimeException("A record must be saved before you can query for backups"))
case Some(fileId)=>
db.run {
makeQuery(fileId, forStorage)
.length
.result
}
}
/* ------- Constructors and such ------- */
/**
* Get a [[FileEntry]] instance for the given database ID
* @param entryId database ID to look up
* @return a Future, containing an Option that may contain a [[FileEntry]] instance
*/
def entryFor(entryId: Int):Future[Option[FileEntry]] =
db.run(
TableQuery[FileEntryRow].filter(_.id===entryId).result.asTry
).map({
case Success(result)=>
result.headOption
case Failure(error)=>throw error
})
/**
* Get a FileEntry instance for the given filename and storage
* @param fileName file name to search for (exact match to file path)
* @param storageId storage ID to search for
* @return a Future, containing a Try that contains a sequnce of zero or more FileEntry instances
*/
def entryFor(fileName: String, storageId: Int, version:Int):Future[Try[Seq[FileEntry]]] =
db.run(
TableQuery[FileEntryRow]
.filter(_.filepath===fileName)
.filter(_.storage===storageId)
.filter(_.version===version)
.result
.asTry
)
/**
* improved version of entryFor that returns either one or no entries in a more composable way.
* This should be all that is needed because of table constraints
* @param fileName the file name to search for (exact match)
* @param storageId storage ID to search for
* @param version version number to search for
* @return a Future containing either a FileEntry or None. The future fails if there is a problem.
*/
def singleEntryFor(fileName: String, storageId:Int, version:Int):Future[Option[FileEntry]] =
db.run(
TableQuery[FileEntryRow].filter(_.filepath===fileName).filter(_.storage===storageId).filter(_.version===version).result
).map(_.headOption)
def allVersionsFor(fileName: String, storageId: Int):Future[Try[Seq[FileEntry]]] =
db.run(
TableQuery[FileEntryRow].filter(_.filepath===fileName).filter(_.storage===storageId).sortBy(_.version.desc.nullsLast).result.asTry
)
/**
* returns a list of matching records for the given file name, ordered by most recent first (if versioning is enabled)
* @param target file path to query. this should be a relative filepath for the given storage.
* @param forStorageId limit results to this storage only
* @return a Future containing a sequence of results
*/
def findByFilename(target:Path, forStorageId:Option[Int], drop:Int=0, take:Int=100) = {
val baseQuery = TableQuery[FileEntryRow].filter(_.filepath===target.toString)
val finalQuery = forStorageId match {
case Some(storageId)=> baseQuery.filter(_.storage===storageId)
case None=>baseQuery
}
db.run {
finalQuery.sortBy(_.version.desc.nullsLast).drop(drop).take(take).result
}
}
/**
* returns a streaming source that lists out all files in the database, optionally limiting to a given storage ID
* @param forStorageId if provided, limit to this storage ID only
* @param onlyWithContent if true, then limit to only returning files that have the 'haveContent' field set. Defaults to True.
* @return an Akka Source, that yields FileEntry objects
*/
def scanAllFiles(forStorageId:Option[Int], onlyWithContent:Boolean=true) = {
val baseQuery = TableQuery[FileEntryRow]
val storageQuery = forStorageId match {
case Some(storageId)=>baseQuery.filter(_.storage===storageId)
case None=>baseQuery
}
val finalQuery = if(onlyWithContent) storageQuery else storageQuery.filter(_.hasContent===true)
Source.fromPublisher(db.stream(finalQuery.sortBy(_.mtime.asc).result))
}
}