app/helpers/StorageHelper.scala (219 lines of code) (raw):
package helpers
import akka.stream.Materializer
import drivers.StorageMetadata
import helpers.StorageHelper.defaultBufferSize
import models.{AssetFolderFileEntry, AssetFolderFileEntryDAO, FileEntry, FileEntryDAO, StorageEntry}
import org.slf4j.{LoggerFactory, MDC}
import play.api.inject.Injector
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.inject.Inject
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
object StorageHelper {
private val logger = LoggerFactory.getLogger(getClass)
val defaultBufferSize:Int = 10*1024*1024 //10Mbyte copy buffer
/**
* utility function to directly copy from one stream to another
* @param input InputStream to read from
* @param output OutputStream to write to
* @param bufferSize size of the temporary buffer to use.
* @return the number of bytes written as a Long. Closes the streams when it is done. Raises exceptions on failure (assumed it's within a try/catch block)
*/
def copyStream(input:InputStream, output:OutputStream, bufferSize:Int=defaultBufferSize) = {
val buf=ByteBuffer.allocate(bufferSize)
var bytesRead: Int = 0
var totalRead: Long = 0
try {
do {
bytesRead = input.read(buf.array())
if(bytesRead == -1) throw new EOFException
totalRead += bytesRead
buf.flip()
output.write(buf.array(),0,bytesRead)
buf.clear()
} while (bytesRead > 0)
output.close()
input.close()
totalRead
} catch {
case _:EOFException=>
logger.debug(s"Stream copy reached EOF")
totalRead
}
}
}
class StorageHelper @Inject() (implicit mat:Materializer, injector:Injector, fileEntryDAO: FileEntryDAO, assetFolderFileEntryDAO: AssetFolderFileEntryDAO) {
private val logger = LoggerFactory.getLogger(getClass)
/**
* proxy for the static copyStream method so that it can be mocked in testing
*/
protected def callCopyStream(input:InputStream, output:OutputStream, bufferSize:Int=defaultBufferSize) = StorageHelper.copyStream(input, output, bufferSize)
def deleteFile(targetFile: FileEntry)(implicit db:slick.jdbc.PostgresProfile#Backend#Database):Future[Either[String, FileEntry]] = {
val futures = Future.sequence(Seq(targetFile.storage, targetFile.getFullPath))
futures.map(results=>{
val storageResult = results.head.asInstanceOf[Option[StorageEntry]]
MDC.put("storageResult", storageResult.toString)
val fullPath = results(1).asInstanceOf[String]
MDC.put("fullPath", fullPath)
storageResult match {
case Some(storageEntry) =>
storageEntry.getStorageDriver match {
case Some(storageDriver) =>
MDC.put("storageDriver", storageDriver.toString)
storageDriver.deleteFileAtPath(fullPath, targetFile.version) match {
case true=>
val updatedFileEntry = targetFile.copy(hasContent = false)
updatedFileEntry.save
Right(updatedFileEntry)
case false=>
Left("storage driver failed to delete file")
}
case None =>
logger.error(s"Can't delete file at $fullPath because storage $storageEntry has no storage driver")
Left("No storage driver configured, enable debugging for helpers.StorageHelper for more info")
}
case None =>
logger.error(s"Can't delete file at $fullPath because file record has no storage")
Left("No storage reference on record, enable debugging for helpers.StorageHelper for more info")
}
})
}
/**
* Copies from the file represented by sourceFile to the (non-existing) file represented by destFile.
* Both should have been saved to the database before calling this method. The files do not need to be on the same
* storage type
* @param sourceFile - [[models.FileEntry]] instance representing file to copy from
* @param destFile - [[models.FileEntry]] instance representing file to copy to
* @param db - database instance, usually passed implicitly.
* @return Future[FileEntry] - a future containing e new, updated [[models.FileEntry]] representing @destFile.
* this future fails if there was an error
*/
def copyFile(sourceFile: FileEntry, destFile:FileEntry)
(implicit db:slick.jdbc.PostgresProfile#Backend#Database):Future[FileEntry] = {
def getStorageDriverForFile(file:FileEntry) = {
file
.storage
.map(_.flatMap(_.getStorageDriver))
.map({
case Some(storageDriver) => storageDriver
case None => throw new RuntimeException(s"Storage with ID ${file.storageId} does not have a valid storage type")
})
}
def withReadStream[A](sourceFile:FileEntry)(cb:(Option[StorageMetadata], InputStream)=>Try[A]) = {
val readStreamFut = for {
driver <- getStorageDriverForFile(sourceFile)
fullPath <- sourceFile.getFullPath
readStream <- Future.fromTry(driver.getReadStream(fullPath, sourceFile.version))
meta <- Future.fromTry(Try { driver.getMetadata(fullPath, sourceFile.version)})
} yield (driver, readStream, meta)
readStreamFut.map({
case (driver, readStream, meta)=>
val result = cb(meta, readStream)
Try { readStream.close() } match {
case Success(_)=>
case Failure(err)=>
logger.error(s"Could not close file $sourceFile via driver $driver: ${err.getMessage}", err)
}
Future.fromTry(result)
}).flatten
}
val destination = for {
destFilePath <- destFile.getFullPath
destStorageDriver <- getStorageDriverForFile(destFile)
} yield (destFilePath, destStorageDriver)
destination.flatMap({
case (destFilePath, destDriver)=>
withReadStream(sourceFile) { (sourceMeta,readStream)=>
destDriver
.writeDataToPath(destFilePath, destFile.version, readStream)
.flatMap(_=> {
//now that the copy completed successfully, we need to check that the file sizes actually match
destDriver.getMetadata(destFilePath, destFile.version) match {
case None =>
Failure(new RuntimeException(s"${sourceFile.filepath}: Could not get destination file metadata"))
case Some(meta)=>
logger.debug(s"${sourceFile.filepath}: Destination size is ${meta.size} and source size is ${sourceMeta.get.size}")
if(meta.size==sourceMeta.get.size) {
Success( () )
} else {
Failure(new RuntimeException(s"${sourceFile.filepath}: Copied file size ${meta.size} did not match source size of ${sourceMeta.get.size}"))
}
}
})
}.map(_=>destFile.copy(hasContent=true))
})
}
def findFile(targetFile: FileEntry)(implicit db:slick.jdbc.PostgresProfile#Backend#Database) = {
val futures = Future.sequence(Seq(targetFile.storage, targetFile.getFullPath))
futures.map(futureResults=>{
val maybeStorage = futureResults.head.asInstanceOf[Option[StorageEntry]]
val fullPath = futureResults(1).asInstanceOf[String]
val maybeStorageDriver = maybeStorage.flatMap(_.getStorageDriver)
maybeStorageDriver match {
case Some(storageDriver)=>
storageDriver.pathExists(fullPath, targetFile.version)
case None=>
throw new RuntimeException(s"No storage driver defined for ${maybeStorage.map(_.repr).getOrElse("unknown storage")}")
}
})
}
def onStorageMetadata(targetFile: FileEntry)(implicit db:slick.jdbc.PostgresProfile#Backend#Database) = {
targetFile.storage.map(maybeStorage=>{
val maybeStorageDriver = maybeStorage.flatMap(_.getStorageDriver)
maybeStorageDriver match {
case Some(storageDriver)=>
storageDriver.getMetadata(targetFile.filepath, targetFile.version)
case None=>
throw new RuntimeException(s"No storage driver defined for ${maybeStorage.map(_.repr).getOrElse("unknown storage")}")
}
})
}
def copyAssetFolderFile(sourceFile: AssetFolderFileEntry, destFile:AssetFolderFileEntry)
(implicit db:slick.jdbc.PostgresProfile#Backend#Database):Future[AssetFolderFileEntry] = {
logger.debug(s"copyAssetFolderFile running with source of $sourceFile")
def getStorageDriverForFile(file:AssetFolderFileEntry) = {
file
.storage
.map(_.flatMap(_.getStorageDriver))
.map({
case Some(storageDriver) => storageDriver
case None => throw new RuntimeException(s"Storage with id. ${file.storageId} does not have a valid storage type")
})
}
def withReadStream[A](sourceFile:AssetFolderFileEntry)(cb:(Option[StorageMetadata], InputStream)=>Try[A]) = {
val readStreamFut = for {
driver <- getStorageDriverForFile(sourceFile)
fullPath <- sourceFile.getFullPath
readStream <- Future.fromTry(driver.getReadStream(fullPath, sourceFile.version))
meta <- Future.fromTry(Try { driver.getMetadata(fullPath, sourceFile.version)})
} yield (driver, readStream, meta)
readStreamFut.map({
case (driver, readStream, meta)=>
val result = cb(meta, readStream)
Try { readStream.close() } match {
case Success(_)=>
case Failure(err)=>
logger.error(s"Could not close file $sourceFile via driver $driver: ${err.getMessage}", err)
}
Future.fromTry(result)
}).flatten
}
val destination = for {
destFilePath <- destFile.getFullPath
destStorageDriver <- getStorageDriverForFile(destFile)
} yield (destFilePath, destStorageDriver)
destination.flatMap({
case (destFilePath, destDriver)=>
withReadStream(sourceFile) { (sourceMeta,readStream)=>
destDriver
.writeDataToPath(destFilePath, destFile.version, readStream)
.flatMap(_=> {
//Now that the copy completed successfully, we need to check that the file sizes actually match
destDriver.getMetadata(destFilePath, destFile.version) match {
case None =>
logger.error(s"${sourceFile.filepath}: Could not get destination file metadata")
Failure(new RuntimeException(s"${sourceFile.filepath}: Could not get destination file metadata"))
case Some(meta)=>
logger.debug(s"${sourceFile.filepath}: Destination size is ${meta.size} and source size is ${sourceMeta.get.size}")
if(meta.size==sourceMeta.get.size) {
Success( () )
} else {
Failure(new RuntimeException(s"${sourceFile.filepath}: Copied file size ${meta.size} did not match source size of ${sourceMeta.get.size}"))
}
}
})
}.map(_=>destFile.copy())
})
}
def assetFolderOnStorageMetadata(targetFile: AssetFolderFileEntry)(implicit db:slick.jdbc.PostgresProfile#Backend#Database) = {
targetFile.storage.map(maybeStorage=>{
val maybeStorageDriver = maybeStorage.flatMap(_.getStorageDriver)
maybeStorageDriver match {
case Some(storageDriver)=>
storageDriver.getMetadata(targetFile.filepath, targetFile.version)
case None=>
throw new RuntimeException(s"No storage driver defined for ${maybeStorage.map(_.repr).getOrElse("unknown storage")}")
}
})
}
}