app/services/NewProjectBackup.scala (468 lines of code) (raw):
package services
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink}
import drivers.{StorageDriver, StorageMetadata}
import helpers.StorageHelper
import models._
import org.slf4j.LoggerFactory
import play.api.Configuration
import play.api.db.slick.DatabaseConfigProvider
import play.api.inject.Injector
import slick.jdbc.PostgresProfile
import slick.jdbc.PostgresProfile.api._
import java.sql.Timestamp
import java.time.{Duration, Instant}
import javax.inject.{Inject, Singleton}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
@Singleton
class NewProjectBackup @Inject() (config:Configuration, dbConfigProvider: DatabaseConfigProvider, storageHelper:StorageHelper)
(implicit mat:Materializer, fileEntryDAO:FileEntryDAO, injector: Injector){
private val logger = LoggerFactory.getLogger(getClass)
private implicit lazy val db = dbConfigProvider.get[PostgresProfile].db
import NewProjectBackup._
/**
* initiates a StorageDriver for every storage in the system
* @return
*/
def loadStorageDrivers(): Future[Map[Int, StorageDriver]] = {
StorageEntryHelper.allStorages.flatMap({
case Failure(err)=>
Future.failed(err)
case Success(storages)=>
Future(
storages
.map(e=>(e.id, e.getStorageDriver))
.collect({case (Some(id), Some(drv))=>(id, drv)})
.toMap
)
})
}
/**
* Caches all of the storages in-memory from the database
* @return a map relating the storage ID to the full record.
*/
def loadAllStorages():Future[Map[Int, StorageEntry]] = {
StorageEntryHelper.allStorages.flatMap({
case Success(entries)=>
Future(entries.map(s=>(s.id.get, s)).toMap)
case Failure(err)=>
Future.failed(err)
})
}
val timeSuffixes = Seq("seconds","minutes","hours","days")
def getTimeDifference(maybeSourceMeta:Option[StorageMetadata], destMeta:StorageMetadata) =
maybeSourceMeta match {
case Some(sourceMeta) =>
val secondsDelta = Duration.between(sourceMeta.lastModified, destMeta.lastModified).getSeconds
if (secondsDelta < 60) {
s"$secondsDelta seconds"
} else {
val minsDelta = secondsDelta / 60.0
if (minsDelta < 60) {
s"$minsDelta minutes"
} else {
val hoursDelta = minsDelta / 60
if (hoursDelta < 24) {
s"$hoursDelta hours"
} else {
val daysDelta = hoursDelta / 24.0
s"$daysDelta days"
}
}
}
case None=>
"Can't get time difference, no source metadata present"
}
/**
* Checks to find the next available (not existing) version number on the storage
* @param destStorage destination storage that FileEntry will be written to. The storage driver associated with this
* storage is then used for lookup.
* @param intendedTarget FileEntry with `version` set to the initial estimate of what the version should be
*/
protected def findAvailableVersion(destStorage:StorageEntry, intendedTarget:FileEntry) = {
destStorage.getStorageDriver match {
case Some(driver)=>
implicit val drv:StorageDriver = driver
def findAvailable(target:FileEntry):Future[FileEntry] = {
target.validatePathExistsDirect.flatMap({
case true=>
logger.debug(s"${target.filepath} ${target.version} exists on ${destStorage}, trying next version")
findAvailable(target.copy(version = target.version+1))
case false=>
logger.debug(s"${target.filepath} ${target.version} does not exist on $destStorage")
Future(target)
})
}
findAvailable(intendedTarget)
case None=>Future.failed(new RuntimeException(s"No storage driver available for storage ${destStorage.id}"))
}
}
/**
* returns a FileEntry indicating a target file to write.
* This is guaranteed to be on the destination storage given.
* - if the destination storage supports versioning, then it is guaranteed not to exist yet (previous dest entry with the version field incremented).
* - If the destination storage does NOT support versioning, then it will be identical to the "previous" dest entry provided
* - if there is no "previous" destination that a new entry will be created from the source entry's metadata
* - if the Source entry does not exist then that's an error
*
* @param maybeSourceFileEntry option containing the source file entry
* @param maybePrevDestEntry optional destination of the previous iteration
* @param destStorage destination storage
* @return a Future containing a FileEntry to write to. This should be saved to the database before proceeding to write.
*/
def ascertainTarget(maybeSourceFileEntry:Option[FileEntry], maybePrevDestEntry:Option[FileEntry], destStorage:StorageEntry):Future[FileEntry] = {
logger.warn(s"In ascertainTarget. maybePrevDestEntry - ${maybePrevDestEntry}")
(maybeSourceFileEntry, maybePrevDestEntry) match {
case (Some(sourceEntry), Some(prevDestEntry))=>
logger.debug(s"${sourceEntry.filepath}: prevDestEntry is $prevDestEntry")
if(destStorage.supportsVersions) {
val intendedTarget = prevDestEntry.copy(id=None,
storageId=destStorage.id.get,
version = prevDestEntry.version+1,
mtime=Timestamp.from(Instant.now()),
atime=Timestamp.from(Instant.now()),
hasContent = false,
hasLink = true,
backupOf = sourceEntry.id) // check
findAvailableVersion(destStorage, intendedTarget)
.map(correctedTarget=>{
logger.debug(s"Destination storage ${destStorage.id} ${destStorage.rootpath} supports versioning, nothing will be over-written. Target version number is ${correctedTarget.version+1}")
correctedTarget
})
} else {
logger.warn(s"Backup destination storage ${destStorage.id} ${destStorage.rootpath} does not support versioning, so last backup will get over-written")
Future(prevDestEntry.copy(
mtime=Timestamp.from(Instant.now()),
atime=Timestamp.from(Instant.now())
))
}
case (Some(sourceEntry), None)=>
logger.debug(s"${sourceEntry.filepath}: no prev dest entry")
Future(
sourceEntry.copy(id=None,
storageId=destStorage.id.get,
version=1,
mtime=Timestamp.from(Instant.now()),
atime=Timestamp.from(Instant.now()),
hasContent=false,
hasLink=true,
backupOf = sourceEntry.id)
)
case (None, _)=>
throw new RuntimeException("Can't back up as source file was not found") //fail the Future
}
}
/**
* adds a row to the FileAssociationRow table to associate the new backup with the same project as
* the source file.
* the `destEntry` MUST have been saved, so it has an `id` attribute set. If not the Future will fail.
* if the `sourceEntry` does not have an existing project association nothing will be done.
* @param sourceEntry FileEntry instance that is being copied from
* @param destEntry FileEntry instance that has just been copied to
* @return a Future, with an Option contianing the number of changed rows if an action was taken.
*/
def makeProjectLink(sourceEntry:FileEntry, destEntry:FileEntry) = {
import cats.implicits._
import slick.jdbc.PostgresProfile.api._
def addRow(forProjectId:Int) = db.run {
TableQuery[FileAssociationRow] += (forProjectId, destEntry.id.get)
}
sourceEntry.id match {
case Some(sourceId) =>
for {
existingLink <- db.run(TableQuery[FileAssociationRow].filter(_.fileEntry === sourceId).result)
result <- existingLink
.headOption
.map(existingAssociation=>addRow(existingAssociation._1))
.sequence //convert Option[Future[A]] into Future[Option[A]] via cats
} yield result
case None=>
logger.debug(s"File $sourceEntry is not linked to any project")
Future(None)
}
}
private def getTargetFileEntry(sourceEntry:FileEntry, maybePrevDestEntry:Option[FileEntry], destStorage:StorageEntry):Future[FileEntry] = {
logger.warn(s"In getTargetFileEntry. maybePrevDestEntry: ${maybePrevDestEntry}")
for {
targetDestEntry <- ascertainTarget(Some(sourceEntry), maybePrevDestEntry, destStorage) //if our destStorage supports versioning, then we get a new entry here
updatedEntryTry <- fileEntryDAO.save(targetDestEntry) //make sure that we get the updated database id of the file
updatedDestEntry <- Future
.fromTry(updatedEntryTry)
.recoverWith({
case err:org.postgresql.util.PSQLException=>
logger.warn(s"While trying to make the target entry, caught exception of type ${err.getClass.getCanonicalName} with message ${err.getMessage}")
if(err.getMessage.contains("duplicate key value violates unique constraint")) {
logger.warn(s"Pre-existing file entry detected for ${targetDestEntry.filepath} v${targetDestEntry.version} on storage ${targetDestEntry.storageId}, recovering it")
fileEntryDAO
.singleEntryFor(targetDestEntry.filepath, targetDestEntry.storageId, targetDestEntry.version)
.map({
case Some(entry)=>entry
case None=>
logger.error(s"Got a conflict exception when trying to create a new record for ${targetDestEntry.filepath} v${targetDestEntry.version} on storage ${targetDestEntry.storageId} but no previous record existed?")
throw new RuntimeException("Database conflict problem, see logs")
})
} else {
Future.failed(err)
}
})
} yield updatedDestEntry
}
/**
* performs the backup operation
* @param sourceEntry FileEntry representing the file to back up
* @param maybePrevDestEntry an optional FileEntry representing the _previous_ incremental backup (if there was one).
* @param destStorage storage to back up onto
* @return a Future, containing a tuple of two FileEntries. The first is the "written" file, the second is the "source" file. The future
* fails on error.
*/
def performBackup(sourceEntry:FileEntry, maybePrevDestEntry:Option[FileEntry], destStorage:StorageEntry) = {
logger.warn(s"maybePrevDestEntry: ${maybePrevDestEntry}")
for {
updatedDestEntry <- getTargetFileEntry(sourceEntry, maybePrevDestEntry, destStorage)
results <- {
logger.warn(s"Backing up ${sourceEntry.filepath} on storage ${sourceEntry.storageId} to ${updatedDestEntry.filepath} v${updatedDestEntry.version} on storage ${updatedDestEntry.storageId}")
storageHelper.copyFile(sourceEntry, updatedDestEntry)
.flatMap(fileEntry=>{
//ensure that we save the record with `b_has_content` set to true
fileEntryDAO.saveSimple(fileEntry).map(finalEntry=>(finalEntry, sourceEntry))
})
.recoverWith({
case err:Throwable=>
logger.error(s"Could not copy ${updatedDestEntry.filepath} on ${updatedDestEntry.storageId} from ${sourceEntry.filepath} on ${sourceEntry.storageId}: ${err.getMessage}",err)
fileEntryDAO
.deleteFromDisk(updatedDestEntry)
.andThen(_=>fileEntryDAO.deleteRecord(updatedDestEntry))
.flatMap(_=>Future.failed(new RuntimeException(err.toString)))
})
}
_ <- makeProjectLink(results._2, results._1)
} yield results
}
private def findMostRecentByFilesystem(potentialBackups:Seq[FileEntry], p:ProjectEntry, storageDrivers:Map[Int, StorageDriver]) = {
//get a list of metadata in order of most recent file modification
potentialBackups.map(backup=>{
storageDrivers.get(backup.storageId) match {
case Some(destDriver)=>
val bMeta = destDriver.getMetadata(backup.filepath, backup.version)
logger.debug(s"Project ${p.projectTitle} (${p.id}) backup file ${backup.filepath} v${backup.version} metadata: $bMeta")
bMeta.map(m=>(backup, m) )
case None=>
logger.error(s"Project ${p.projectTitle} (${p.id}) Could not get a destination driver for storage ${backup.storageId} on file ${backup.filepath} v${backup.version}")
None
}
})
.collect({case Some(result)=>result})
.sortBy(_._2.lastModified.toInstant.toEpochMilli)(Ordering.Long.reverse)
.map(entries=>{
logger.debug(s"Ordered entry: ${entries._1.filepath} ${entries._1.version} ${entries._2.lastModified} ${entries._2.size}")
entries
})
.headOption
}
/**
* Checks whether the given project file needs backing up
* @param sourceFile FileEntry representing the file to potentially be backed up
* @param potentialBackups **sorted** list of FileEntry representing the existing backups of the given sourceFile.
* It is assumed that this is sorted in descending time order, i.e. most recent first and oldest
* last.
* @param p the ProjectEntry representing the project that the files belong to (for logging purposes)
* @param storageDrivers cached Map of StorageDrivers, so we don't initialise a new one on every file
* @return Success with a Right if a backup is required or a Left if no backup is required.
* In the "Backup Required" case, the Right could contain a FileEntry or None. If it's a FileEntry, that is
* representing the _most recent_ backup (which is now out-dated).
* If there is an error, a Failure is returned.
*/
def validateExistingBackups(sourceFile:FileEntry, potentialBackups:Seq[FileEntry], p:ProjectEntry, storageDrivers:Map[Int, StorageDriver]): Try[Either[String, Option[FileEntry]]] = {
storageDrivers.get(sourceFile.storageId) match {
case None=>
logger.error(s"Project ${p.projectTitle} (${p.id}) Could not get a storage driver for storage ${sourceFile.storageId} on file ${sourceFile.filepath}")
Failure(new RuntimeException(s"Could not get a storage driver for storage ${sourceFile.storageId} on file ${sourceFile.filepath}"))
case Some(sourceDriver)=>
val sourceMeta = sourceDriver.getMetadata(sourceFile.filepath, sourceFile.version)
logger.debug(s"Project ${p.projectTitle} (${p.id}) source file ${sourceFile.filepath} v${sourceFile.version} metadata: $sourceMeta")
findMostRecentByFilesystem(potentialBackups, p, storageDrivers) match {
case None=>
logger.info(s"Project ${p.projectTitle} (${p.id}) most recent metadata was empty, could not check sizes. Assuming that the file is not present and needs backup")
Success(Right(None)) //signal needs backup
case Some( (fileEntry, meta) )=>
logger.info(s"Project ${p.projectTitle} (${p.id}) Most recent backup leads source file by ${getTimeDifference(sourceMeta, meta)}")
if(meta.size==sourceMeta.get.size) {
logger.info(s"Project ${p.projectTitle} (${p.id}) Most recent backup version ${fileEntry.version} matches source, no backup required")
Success(Left("No backup required"))
} else {
logger.info(s"Project ${p.projectTitle} (${p.id}) Most recent backup version ${fileEntry.version} size mismatch ${sourceMeta.get.size} vs ${meta.size}, backup needed")
Success(Right(Some(fileEntry)))
}
}
}
}
/**
* Checks whether we should back up the given file
* @param projectAndFiles tuple consisting of the ProjectEntry and a list of all its files
* @param storageDrivers internal, immutable StorageDrivers cache
* @return a Future containing:
* - a Left if there was a problem and the file could not be backed up but the backup job should continue
* - a Right with `true` if the file was backed up
* - a Right with `false` if the file did not need backing up
* The returned future fails on a permanent error, this should be picked up with .recover and the backup job terminated
*/
def conditionalBackup(projectAndFiles:(ProjectEntry, Seq[FileEntry]), storageDrivers:Map[Int, StorageDriver], storages:Map[Int, StorageEntry]):Future[Either[String, Boolean]] = {
val p = projectAndFiles._1
val nonBackupFiles = projectAndFiles._2.filter(_.backupOf.isEmpty)
if(nonBackupFiles.isEmpty) {
logger.warn(s"Project ${p.projectTitle} (${p.id}) has no current file")
Future(Left(s"Project ${p.projectTitle} (${p.id}) has no current file"))
} else {
if(nonBackupFiles.length>1) {
logger.warn(s"Project ${p.projectTitle} (${p.id}) has multiple non-backup files:")
nonBackupFiles.foreach(f=>logger.warn(s"\t${p.projectTitle} (${p.id}) ${f.filepath} on storage ${f.storageId}"))
}
val sourceFile = nonBackupFiles.head
validateExistingBackups(
sourceFile,
projectAndFiles._2.filter(_.backupOf.isDefined).sortBy(_.version)(Ordering.Int.reverse),
p,
storageDrivers
) match {
case Failure(err)=>
Future.failed(err)
case Success(Left(msg))=>
logger.info(s"Project ${p.projectTitle} (${p.id}): $msg")
Future(Right(false))
case Success(Right(maybeMostRecentBackup))=>
logger.debug(s"I will back up project ${p.projectTitle} (${p.id})")
val maybeDestStorage = for {
sourceStorage <- storages.get(sourceFile.storageId)
destStorageId <- sourceStorage.backsUpTo
destStorage <- storages.get(destStorageId)
} yield destStorage
maybeDestStorage match {
case None=>
Future(
Left(s"Cannot back up ${p.projectTitle} (${p.id}) because either the source file id ${sourceFile.storageId} is not valid or there is no backup storage configured for it")
)
case Some(destStorage)=>
if(sourceFile.storageId==destStorage.id.get) {
Future.failed(new RuntimeException(s"Cannot back up ${p.projectTitle} (${p.id}) because storage ${sourceFile.storageId} is configured to back up to itself. This is not supported and can lead to data loss, please fix."))
} else {
performBackup(sourceFile, maybeMostRecentBackup, destStorage)
.map(_=>Right(true))
.recover({
case err: Throwable =>
Left(s"Cannot back up ${p.projectTitle} (${p.id}) because ${err.getMessage} occurred while copying ${sourceFile.filepath} v${sourceFile.version} from storage ${sourceFile.storageId}")
})
}
}
}
}
}
/**
* Deletes all zero-length backups for the given project. This deletes the files from disk via the storage driver,
* deletes the ProjectFileAssociation and the FileEntry associated with the dodgy backup file.
* @param projectAndFiles a 2-tuple consisting of the ProjectEntry representing the project and a list of all the
* FileEntry objects associated with it
* @param storageDrivers cached map of StorageDrivers, so we don't have to initialise a new one every time
* @return a successful Future if all the invalid backups are removed or there were none to remove. A Failed future if
* there is a problem.
*/
def fixInvalidBackupsFor(projectAndFiles:(ProjectEntry, Seq[FileEntry]), storageDrivers:Map[Int, StorageDriver]):Future[Seq[Unit]] = {
val p = projectAndFiles._1
val backupFiles = projectAndFiles._2.filter(_.backupOf.isDefined)
val zeroLengthBackups = backupFiles.filter(fileEntry=>{
storageDrivers.get(fileEntry.storageId) match {
case None=>
logger.error(s"Could not get a storage driver for ${fileEntry.filepath} on storage id ${fileEntry.storageId}")
false
case Some(driver)=>
driver.getMetadata(fileEntry.filepath, fileEntry.version) match {
case None=>
logger.error(s"Could not get metadata for ${fileEntry.filepath} v ${fileEntry.version} on storage id ${fileEntry.storageId} with driver ${driver.getClass.getCanonicalName}")
false
case Some(meta)=>
if(meta.size==0) {
logger.info(s"Found dodgy backup: $meta")
}
meta.size==0
}
}
})
logger.info(s"Project ${p.projectTitle} (${p.id}) has ${zeroLengthBackups.length} zero-length backups")
Future.sequence(
zeroLengthBackups.map(fileEntry=>{
storageDrivers.get(fileEntry.storageId) match {
case None=>
logger.error(s"Could not get a storage driver for ${fileEntry.filepath} on storage id ${fileEntry.storageId}")
Future.failed(new RuntimeException("Could not get a storage driver on the second pass, this should not happen!"))
case Some(driver)=>
if(fileEntry.storageId!=2) { //TEMPORARY HACK
logger.info(s"Deleting zero-length backup ${fileEntry.filepath} on storage id ${fileEntry.storageId}")
if (driver.deleteFileAtPath(fileEntry.filepath, fileEntry.version)) {
logger.info(s"Deleting zero-length backup entry ${fileEntry.id}")
fileEntry.deleteSelf
} else {
Future.failed(new RuntimeException(s"Could not delete file ${fileEntry.filepath} on storage id ${fileEntry.storageId}"))
}
} else {
Future( () )
}
}
})
)
}
def nukeInvalidBackups:Future[BackupResults] = {
val parallelCopies = config.getOptional[Int]("backup.parallelCopies").getOrElse(1)
loadStorageDrivers().flatMap(drivers=>
ProjectEntry
.scanAllProjects
.map(p=>{
logger.info(s"Checking project ${p.projectTitle} for invalid backups")
p
})
.mapAsync(1)(p=>p.associatedFiles(allVersions = true).map((p, _)))
.map(projectAndFiles=>{
val p = projectAndFiles._1
val f = projectAndFiles._2
val backupsCount = f.count(_.backupOf.isDefined)
logger.info(s"Project ${p.projectTitle} has ${f.length} files of which $backupsCount are backups")
projectAndFiles
})
.mapAsync(1)(projectAndFiles=>fixInvalidBackupsFor(projectAndFiles, drivers))
.toMat(Sink.fold(BackupResults.empty(0))((acc, results)=>{
acc.copy(successCount = acc.successCount+results.length)
}))(Keep.right)
.run()
)
}
def backupProjects(onlyInProgress:Boolean):Future[BackupResults] = {
val parallelCopies = config.getOptional[Int]("backup.parallelCopies").getOrElse(1)
var backupTypes = Array(1,5)
val backupTypesSequence = config.getOptional[Seq[Int]]("backup_types").getOrElse(None)
if (backupTypesSequence != None) {
backupTypes = backupTypesSequence.iterator.toArray
}
def getScanSource() = if(onlyInProgress) {
ProjectEntry.scanProjectsForStatusAndTypes(EntryStatus.InProduction, backupTypes)
} else {
ProjectEntry.scanAllProjects
}
for {
drivers <- loadStorageDrivers()
storages <- loadAllStorages()
result <- getScanSource()
.map(p => {
logger.debug(s"Checking project ${p.projectTitle} for backup")
p
})
.mapAsync(parallelCopies)(p => p.associatedFiles(allVersions = true).map((p, _)))
.map(projectAndFiles => {
val p = projectAndFiles._1
val f = projectAndFiles._2
val backupsCount = f.count(_.backupOf.isDefined)
logger.debug(s"Project ${p.projectTitle} has ${f.length} files of which $backupsCount are backups")
projectAndFiles
})
.mapAsync(parallelCopies)(projectAndFiles => conditionalBackup(projectAndFiles, drivers, storages))
.toMat(Sink.fold(BackupResults.empty(0))((acc, elem) => elem match {
case Left(errs) =>
logger.warn(s"Backup failed: ${errs}")
acc.copy(totalCount = acc.totalCount + 1, failedCount = acc.failedCount + 1)
case Right(true) =>
acc.copy(totalCount = acc.totalCount + 1, successCount = acc.successCount + 1)
case Right(false) =>
acc.copy(totalCount = acc.totalCount + 1, notNeededCount = acc.notNeededCount + 1)
}))(Keep.right)
.run()
} yield result
}
/**
* Deletes all audio backups for the given project. This deletes the files from disk via the storage driver,
* deletes the ProjectFileAssociation and the FileEntry associated with the backup file.
* @param projectAndFiles A 2-tuple consisting of the ProjectEntry representing the project and a list of all the
* FileEntry objects associated with it
* @param storageDrivers Cached map of StorageDrivers, so we don't have to initialise a new one every time
* @return A successful Future if all the invalid backups are removed or there were none to remove. A Failed future if
* there is a problem.
*/
def deleteAudioBackupsFor(projectAndFiles:(ProjectEntry, Seq[FileEntry]), storageDrivers:Map[Int, StorageDriver]):Future[Seq[Unit]] = {
val p = projectAndFiles._1
val backupFiles = projectAndFiles._2.filter(_.backupOf.isDefined)
val audioBackups = backupFiles.filter(fileEntry=>{
if(fileEntry.filepath.endsWith(".cpr")) {
logger.debug(s"Found Cubase file: ${fileEntry.filepath}")
true
} else if(fileEntry.filepath.endsWith(".sesx")) {
logger.debug(s"Found Audition file: ${fileEntry.filepath}")
true
} else {
false
}
})
logger.info(s"Project ${p.projectTitle} (${p.id.get}) has ${audioBackups.length} audio backups")
Future.sequence(
audioBackups.map(fileEntry=>{
storageDrivers.get(fileEntry.storageId) match {
case None=>
logger.error(s"Could not get a storage driver for ${fileEntry.filepath} on storage id ${fileEntry.storageId}")
Future.failed(new RuntimeException("Could not get a storage driver on the second pass, this should not happen!"))
case Some(driver)=>
if(fileEntry.storageId!=2) {
logger.info(s"Deleting backup ${fileEntry.filepath} on storage id ${fileEntry.storageId}")
if (driver.deleteFileAtPath(fileEntry.filepath, fileEntry.version)) {
logger.info(s"Deleting backup entry ${fileEntry.id}")
fileEntry.deleteSelf
} else {
logger.error(s"Could not delete file ${fileEntry.filepath} on storage id ${fileEntry.storageId}")
Future( () )
}
} else {
Future( () )
}
}
})
)
}
def deleteAudioBackups:Future[BackupResults] = {
loadStorageDrivers().flatMap(drivers=>
ProjectEntry
.scanAllProjects
.map(p=>{
logger.info(s"Checking project ${p.projectTitle} for backups")
p
})
.mapAsync(1)(p=>p.associatedFiles(allVersions = true).map((p, _)))
.map(projectAndFiles=>{
val p = projectAndFiles._1
val f = projectAndFiles._2
val backupsCount = f.count(_.backupOf.isDefined)
logger.info(s"Project ${p.projectTitle} has ${f.length} files of which $backupsCount are backups")
projectAndFiles
})
.mapAsync(1)(projectAndFiles=>deleteAudioBackupsFor(projectAndFiles, drivers))
.toMat(Sink.fold(BackupResults.empty(0))((acc, results)=>{
acc.copy(successCount = acc.successCount+results.length)
}))(Keep.right)
.run()
)
}
}
object NewProjectBackup {
case class BackupResults(storageId:Int, totalCount:Long, failedCount:Long, successCount:Long, notNeededCount:Long)
object BackupResults {
def empty(storageId:Int) = new BackupResults(storageId, 0,0,0,0)
}
}