app/services/ProjectBackupAssetFolder.scala (365 lines of code) (raw):

package services import akka.stream.Materializer import akka.stream.scaladsl.{Keep, Sink} import drivers.{StorageDriver, StorageMetadata} import helpers.StorageHelper import models._ import org.slf4j.LoggerFactory import play.api.Configuration import play.api.db.slick.DatabaseConfigProvider import play.api.inject.Injector import slick.jdbc.PostgresProfile import java.nio.file.{Files, Paths} import java.sql.Timestamp import java.time.{Duration, Instant, LocalDateTime} import javax.inject.{Inject, Singleton} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} import java.io.File import scala.concurrent.duration._ import scala.language.postfixOps @Singleton class ProjectBackupAssetFolder @Inject()(config:Configuration, dbConfigProvider: DatabaseConfigProvider, storageHelper:StorageHelper) (implicit mat:Materializer, fileEntryDAO:FileEntryDAO, assetFolderFileEntryDAO: AssetFolderFileEntryDAO, injector: Injector){ private val logger = LoggerFactory.getLogger(getClass) private implicit lazy val db = dbConfigProvider.get[PostgresProfile].db import ProjectBackupAssetFolder._ /** * Initiates a StorageDriver for every storage in the system * @return */ def loadStorageDrivers(): Future[Map[Int, StorageDriver]] = { StorageEntryHelper.allStorages.flatMap({ case Failure(err)=> Future.failed(err) case Success(storages)=> Future( storages .map(e=>(e.id, e.getStorageDriver)) .collect({case (Some(id), Some(drv))=>(id, drv)}) .toMap ) }) } /** * Caches all of the storages in-memory from the database * @return A map relating the storage ID to the full record. */ def loadAllStorages():Future[Map[Int, StorageEntry]] = { StorageEntryHelper.allStorages.flatMap({ case Success(entries)=> Future(entries.map(s=>(s.id.get, s)).toMap) case Failure(err)=> Future.failed(err) }) } val timeSuffixes = Seq("seconds","minutes","hours","days") def getTimeDifference(maybeSourceMeta:Option[StorageMetadata], destMeta:StorageMetadata) = maybeSourceMeta match { case Some(sourceMeta) => val secondsDelta = Duration.between(sourceMeta.lastModified, destMeta.lastModified).getSeconds if (secondsDelta < 60) { s"$secondsDelta seconds" } else { val minsDelta = secondsDelta / 60.0 if (minsDelta < 60) { s"$minsDelta minutes" } else { val hoursDelta = minsDelta / 60 if (hoursDelta < 24) { s"$hoursDelta hours" } else { val daysDelta = hoursDelta / 24.0 s"$daysDelta days" } } } case None=> "Can not get time difference, no source metadata present" } /** * Checks to find the next available (not existing) version number on the storage * @param destStorage Destination storage that AssetFolderFileEntry will be written to. The storage driver associated with this * storage is then used for lookup. * @param intendedTarget AssetFolderFileEntry with `version` set to the initial estimate of what the version should be */ protected def findAvailableVersion(destStorage:StorageEntry, intendedTarget:AssetFolderFileEntry) = { destStorage.getStorageDriver match { case Some(driver)=> implicit val drv:StorageDriver = driver def findAvailable(target:AssetFolderFileEntry):Future[AssetFolderFileEntry] = { target.validatePathExistsDirect.flatMap({ case true=> logger.debug(s"${target.filepath} ${target.version} exists on ${destStorage}, trying next version") findAvailable(target.copy(version = target.version+1)) case false=> logger.debug(s"${target.filepath} ${target.version} does not exist on $destStorage") Future(target) }) } findAvailable(intendedTarget) case None=>Future.failed(new RuntimeException(s"No storage driver available for storage ${destStorage.id}")) } } /** * Returns a AssetFolderFileEntry indicating a target file to write. * This is guaranteed to be on the destination storage given. * - If the destination storage supports versioning, then it is guaranteed not to exist yet (previous dest entry with the version field incremented). * - If the destination storage does NOT support versioning, then it will be identical to the "previous" dest entry provided * - If there is no "previous" destination that a new entry will be created from the source entry's metadata * - If the Source entry does not exist then that's an error * * @param maybeSourceFileEntry Option containing the source file entry * @param maybePrevDestEntry Optional destination of the previous iteration * @param destStorage Destination storage * @return A Future containing a AssetFolderFileEntry to write to. This should be saved to the database before proceeding to write. */ def ascertainTarget(maybeSourceFileEntry:Option[AssetFolderFileEntry], maybePrevDestEntry:Option[AssetFolderFileEntry], destStorage:StorageEntry):Future[AssetFolderFileEntry] = { logger.debug(s"In ascertainTarget. maybePrevDestEntry - ${maybePrevDestEntry}") (maybeSourceFileEntry, maybePrevDestEntry) match { case (Some(sourceEntry), Some(prevDestEntry))=> logger.debug(s"${sourceEntry.filepath}: prevDestEntry is $prevDestEntry") if(destStorage.supportsVersions) { val intendedTarget = prevDestEntry.copy(id=None, storageId=destStorage.id.get, version = prevDestEntry.version+1, mtime=Timestamp.from(Instant.now()), atime=Timestamp.from(Instant.now()), backupOf = sourceEntry.id) // check findAvailableVersion(destStorage, intendedTarget) .map(correctedTarget=>{ logger.debug(s"Destination storage ${destStorage.id} ${destStorage.rootpath} supports versioning, nothing will be over-written. Target version number is ${correctedTarget.version}") correctedTarget }) } else { logger.warn(s"Backup destination storage ${destStorage.id} ${destStorage.rootpath} does not support versioning, so last backup will get over-written") Future(prevDestEntry.copy( mtime=Timestamp.from(Instant.now()), atime=Timestamp.from(Instant.now()) )) } case (Some(sourceEntry), None)=> logger.debug(s"${sourceEntry.filepath}: no prev dest entry") Future( sourceEntry.copy(id=None, storageId=destStorage.id.get, version=1, mtime=Timestamp.from(Instant.now()), atime=Timestamp.from(Instant.now()), backupOf = sourceEntry.id) ) case (None, _)=> throw new RuntimeException("Can not back up as source file was not found") //Fail the Future } } private def getTargetFileEntry(sourceEntry:AssetFolderFileEntry, maybePrevDestEntry:Option[AssetFolderFileEntry], destStorage:StorageEntry):Future[AssetFolderFileEntry] = { logger.debug(s"In getTargetFileEntry. maybePrevDestEntry: ${maybePrevDestEntry}") for { targetDestEntry <- ascertainTarget(Some(sourceEntry), maybePrevDestEntry, destStorage) //If our destStorage supports versioning, then we get a new entry here updatedEntryTry <- assetFolderFileEntryDAO.save(targetDestEntry) //Make sure that we get the updated database id of the file updatedDestEntry <- Future .fromTry(updatedEntryTry) .recoverWith({ case err:org.postgresql.util.PSQLException=> logger.warn(s"While trying to make the target entry, caught exception of type ${err.getClass.getCanonicalName} with message ${err.getMessage}") if(err.getMessage.contains("duplicate key value violates unique constraint")) { logger.warn(s"Pre-existing file entry detected for ${targetDestEntry.filepath} v${targetDestEntry.version} on storage ${targetDestEntry.storageId}, recovering it") assetFolderFileEntryDAO .singleEntryFor(targetDestEntry.filepath, targetDestEntry.storageId, targetDestEntry.version) .map({ case Some(entry)=>entry case None=> logger.error(s"Got a conflict exception when trying to create a new record for ${targetDestEntry.filepath} v${targetDestEntry.version} on storage ${targetDestEntry.storageId} but no previous record existed?") throw new RuntimeException("Database conflict problem, see logs") }) } else { Future.failed(err) } }) } yield updatedDestEntry } def getListOfFiles(dir: File): Array[File] = { val filesList= dir.listFiles val res = filesList ++ filesList.filter(_.isDirectory).flatMap(getListOfFiles) res.filter { f => f.isFile && (f.getName.endsWith(".cpr") || f.getName.endsWith(".sesx")) && !f.getName.startsWith("._") && !f.getPath.contains("/Backup/") && !f.getPath.contains("/RestoredProjectFiles/") } } def getAssetFolderProjectFilePaths (name: String): Array[String] = { val fileArray = getListOfFiles(new File(name)) fileArray.map(_.getAbsolutePath) } def getMostRecentEntryForProject(projectId: Int, storage: Int, filePath: String): Future[AssetFolderFileEntry] = { assetFolderFileEntryDAO.entryForLatestVersionByProject(projectId, storage, filePath).map { _.get } } def findMostRecentBackup(potentialBackups:Seq[AssetFolderFileEntry], p:ProjectEntry, storageDrivers:Map[Int, StorageDriver]) = { potentialBackups.map(backup=>{ storageDrivers.get(backup.storageId) match { case Some(destDriver)=> val bMeta = destDriver.getMetadata(backup.filepath, backup.version) logger.debug(s"Project ${p.projectTitle} (${p.id.get}) backup file ${backup.filepath} v${backup.version} metadata: $bMeta") bMeta.map(m=>(backup, m) ) case None=> logger.error(s"Project ${p.projectTitle} (${p.id.get}) Could not get a destination driver for storage ${backup.storageId} on file ${backup.filepath} v${backup.version}") None } }) .collect({case Some(result)=>result}) .sortBy(_._2.lastModified.toInstant.toEpochMilli)(Ordering.Long.reverse) .map(entries=>{ logger.debug(s"Ordered entry: ${entries._1.filepath} ${entries._1.version} ${entries._2.lastModified} ${entries._2.size}") entries }) .headOption } def shouldCopy(projectId: Int, storage: Int, p: ProjectEntry, storageDrivers:Map[Int, StorageDriver], sourceFile: AssetFolderFileEntry ): Boolean = { try { val mostRecentEntry = Await.result(getMostRecentEntryForProject(projectId, storage, sourceFile.filepath), 10 seconds) logger.debug(s"Most recent version for project $projectId is ${mostRecentEntry.version}") val sourceMetaTwo = storageDrivers.get(sourceFile.storageId) match { case Some(sourceDriver) => sourceDriver.getMetadata(sourceFile.filepath, sourceFile.version) } findMostRecentBackup(Seq(mostRecentEntry), p, storageDrivers) match { case None => logger.info(s"Project ${p.projectTitle} (${p.id.get}) most recent metadata was empty, could not check sizes. Assuming that the file is not present and needs backup") true case Some((fileEntry, meta)) => logger.info(s"Project ${p.projectTitle} (${p.id.get}) Most recent backup leads source file by ${getTimeDifference(sourceMetaTwo, meta)}") if (meta.size == sourceMetaTwo.get.size) { logger.info(s"Project ${p.projectTitle} (${p.id.get}) Most recent backup version ${fileEntry.version} matches source, no backup required") false } else { logger.info(s"Project ${p.projectTitle} (${p.id.get}) Most recent backup version ${fileEntry.version} size mismatch ${sourceMetaTwo.get.size} vs ${meta.size}, backup needed") true } } } catch { case e: Throwable => logger.debug(s"Problem attempting to get old version. Likely there was none present. Error: $e") true } } def getOldVersionEntry(projectId: Int, storage: Int, p: ProjectEntry, storageDrivers:Map[Int, StorageDriver], sourceFile: AssetFolderFileEntry ): Try[Either[String, Option[AssetFolderFileEntry]]] = { logger.debug(s"getOldVersionEntry run with projectId: $projectId, storage: $storage, path: ${sourceFile.filepath}") try { val mostRecentEntry = Await.result(getMostRecentEntryForProject(projectId, storage, sourceFile.filepath), 10 seconds) val sourceMetaTwo = storageDrivers.get(sourceFile.storageId) match { case Some(sourceDriver) => sourceDriver.getMetadata(sourceFile.filepath, sourceFile.version) } findMostRecentBackup(Seq(mostRecentEntry), p, storageDrivers) match { case None => logger.debug(s"getOldVersionEntry: Project ${p.projectTitle} (${p.id.get}) most recent metadata was empty, could not check sizes. Assuming that the file is not present and needs backup") Success(Right(None)) case Some((fileEntry, meta)) => if (meta.size == sourceMetaTwo.get.size) { logger.debug(s"getOldVersionEntry: Project ${p.projectTitle} (${p.id.get}) Most recent backup version ${fileEntry.version} matches source, no backup required") Success(Right(Some(fileEntry))) } else { logger.debug(s"getOldVersionEntry: Project ${p.projectTitle} (${p.id.get}) Most recent backup version ${fileEntry.version} size mismatch ${sourceMetaTwo.get.size} vs ${meta.size}, backup needed") Success(Right(Some(fileEntry))) } } } catch { case e: Throwable => logger.debug(s"Problem attempting to get old version. Likely there was none present. Error: $e") Success(Right(None)) } } def backupProjects(onlyByType:Boolean):Future[BackupResults] = { val parallelCopies = config.getOptional[Int]("backup.parallelCopies").getOrElse(1) val makeFoldersSetting = config.getOptional[Boolean]("asset_folder_backup_make_folders").getOrElse(true) var backupTypes = Array(2,3,4,6) val backupTypesSequence = config.getOptional[Seq[Int]]("asset_folder_backup_types").getOrElse(None) if (backupTypesSequence != None) { backupTypes = backupTypesSequence.iterator.toArray } def getScanSource() = if(onlyByType) { ProjectEntry.scanProjectsForTypes(backupTypes) } else { ProjectEntry.scanAllProjects } def getAssetFolder(id:Int) = ProjectMetadata.entryFor(id, ProjectMetadata.ASSET_FOLDER_KEY) def getDestFileFor(filePath:String, recordTimestamp:Timestamp, projectId:Option[Int], assetFolderStorage:Int)(implicit db: slick.jdbc.PostgresProfile#Backend#Database): Future[AssetFolderFileEntry] = assetFolderFileEntryDAO.entryFor(filePath,1).map({ case Success(filesList) => if (filesList.isEmpty) { //No file entries exist already, create one (at version 1) and proceed assetFolderFileEntryDAO.save(AssetFolderFileEntry(None, filePath, assetFolderStorage, 1, recordTimestamp, recordTimestamp, recordTimestamp, projectId, backupOf = None)) AssetFolderFileEntry(None, filePath, assetFolderStorage, 1, recordTimestamp, recordTimestamp, recordTimestamp, projectId, backupOf = None) } else { filesList.head } case Failure(error) => throw error }) for { drivers <- loadStorageDrivers() storages <- loadAllStorages() result <- getScanSource() .map(p => { logger.debug(s"Checking project ${p.projectTitle} for backup. Type: ${p.projectTypeId}") p }) .mapAsync(parallelCopies)(p => { logger.debug(s"Got project id.: ${p.id.get}") getAssetFolder(p.id.get).onComplete(folderData => folderData match { case Failure(error) => logger.debug(s"Attempt at getting asset folder path failed: $error") case Success(assetFolderData) => try { logger.debug(s"Got asset folder path: ${assetFolderData.get.value.get}") val fileArray = getAssetFolderProjectFilePaths(assetFolderData.get.value.get) fileArray.foreach(filePath => { logger.debug(s"File path: $filePath") val assetFolderStorage = config.getOptional[Int]("asset_folder_storage").getOrElse(1) logger.debug(s"Storage to access: $assetFolderStorage") val storageObject = StorageEntryHelper.entryFor(assetFolderStorage).onComplete(storageData => storageData match { case Failure(error) => logger.debug(s"Attempt at getting storage data failed: $error") case Success(storageData) => val rootPath = storageData.get.rootpath.get logger.debug(s"Root: $rootPath") val fixedPath = filePath.replace(rootPath,"") val recordTimestamp = Timestamp.valueOf(LocalDateTime.now()) val assetFolderFileDest = getDestFileFor(fixedPath, recordTimestamp, p.id, assetFolderStorage) val assetFolderBackupStorage = config.getOptional[Int]("asset_folder_backup_storage").getOrElse(1) logger.debug(s"Storage to use: $assetFolderBackupStorage") assetFolderFileDest.map(fileEntry=> { val storageSupportsVersions = storages(assetFolderBackupStorage).supportsVersions logger.debug(s"Back up storage supports versions: $storageSupportsVersions") var attemptCopy = false if (storageSupportsVersions) { attemptCopy = shouldCopy(p.id.get, assetFolderBackupStorage, p, drivers, fileEntry) } else { attemptCopy = true } if (attemptCopy) { val possiblyOldVersionEntry = getOldVersionEntry(p.id.get, assetFolderBackupStorage, p, drivers, fileEntry) match { case Failure(err) => None case Success(Left(msg)) => None case Success(Right(maybeMostRecentBackup)) => maybeMostRecentBackup } logger.debug(s"possiblyOldVersionEntry: $possiblyOldVersionEntry for path: $filePath") getTargetFileEntry(fileEntry, possiblyOldVersionEntry, storages.get(assetFolderBackupStorage).get).onComplete(fileDest => fileDest match { case Failure(error) => logger.debug(s"Attempt at getting file data failed: $error") case Success(destData) => destData.getFullPath.onComplete { case Failure(exception) => logger.debug(s"Attempt at getting path failed:: $exception") case Success(pathData) => if (makeFoldersSetting) { logger.debug(s"Write path: $pathData") val pathFolder = Paths.get(pathData).getParent.toString logger.debug(s"Write folder: $pathFolder") Files.createDirectories(Paths.get(pathFolder)) } storageHelper.copyAssetFolderFile(fileEntry, destData) } }) } }) }) Thread.sleep(config.getOptional[Long]("backup.pauseMilliseconds").getOrElse(0)) }) } catch { case e: java.lang.NullPointerException => logger.debug(s"Could not find any project files to process.") case e: java.util.NoSuchElementException => logger.debug(s"Could not find an asset folder path.") } }) Thread.sleep(200) Future(Right(true)) }) .toMat(Sink.fold(BackupResults.empty(0))((acc, elem) => elem match { case Right(true) => acc.copy(totalCount = acc.totalCount + 1, successCount = acc.successCount + 1) case Right(false) => acc.copy(totalCount = acc.totalCount + 1, notNeededCount = acc.notNeededCount + 1) }))(Keep.right) .run() } yield result } } object ProjectBackupAssetFolder { case class BackupResults(storageId:Int, totalCount:Long, failedCount:Long, successCount:Long, notNeededCount:Long) object BackupResults { def empty(storageId:Int) = new BackupResults(storageId, 0,0,0,0) } }