in app/controllers/PremiereVersionConverter.scala [119:178]
def runVersionScanner(projectTypeId:Int) = {
val xtractor = new ExtractPremiereVersion
ProjectEntry
.scanProjectsForType(projectTypeId) //step 1 - stream out projects
.mapAsync(2)(p=>p.associatedFiles(allVersions=false).map(files=>(p, files))) //step 2 - now pick up the file associations for each of them. allVersions = false => only default storage.
.mapAsync(1)(content=>{ //step 3 - get the full filepath for each of the listed files and add it into the data stream
val project = content._1
val files = content._2
Future
.sequence(files.map(f=>fileEntryDAO.getFullPath(f)))
.map(filePaths=>(project, filePaths zip files))
})
.mapAsync(1)(content=>{ //step 4 - now comes the real action....
val project = content._1
val filePaths = content._2
for {
//run the ExtractPremiereVersion postrun across all of the listed files.
//map the output into a tuple list consisting of the postrun output (success/failure) and the incoming data
extractedInfo <- Future.sequence(filePaths.map(filePair =>
xtractor.postrun(filePair._1, project, null, PostrunDataCache(), None, None)
)).map(_ zip filePaths)
//now iterate through those results.
// If the postrun ran successfully, pick up the version and output and updated version of the FileEntry.
// If the postrun ran successfully but did not output a version, abort; this should not happen and indicates a bug.
// If it did not run successfully then log a warning but continue
results <- Future(extractedInfo.map(info=>info._1 match {
case Success(updatedCache)=>
updatedCache.get("premiere_version") match {
case Some(updatedVersion)=>
logger.info(s"${info._2._1}: Internal premiere version number is $updatedVersion")
Some(info._2._2.copy(maybePremiereVersion = Some(updatedVersion.toInt)))
case None=>
throw new RuntimeException(s"${info._2._1}: ExtractPremiereVersion completed successfully but did not return any info, this should not happen")
}
case Failure(err)=>
logger.warn(s"Could not get premiere version from ${info._2._1}: ${err.getClass.getCanonicalName} ${err.getMessage}", err)
None
}))
} yield results
})
.map(_.collect({case Some(fileEntry)=>fileEntry})) //step 5 - filter out unsuccessful attempts
.mapAsync(4)(files=>{ //step 6 - write the modified records back to the database
Future.sequence(
files.map(f=>{
logger.info(s"Saving updated record for ${f.id} ${f.filepath}")
fileEntryDAO.saveSimple(f)
})
)
})
.toMat(Sink.ignore)(Keep.right)
.run()
.map(_=>{
logger.info("Premiere version scan is now complete")
})
.recover({
case err:Throwable=>
logger.error(s"Premiere version scan failed: ${err.getClass.getCanonicalName} ${err.getMessage}", err)
})
}