def runVersionScanner()

in app/controllers/PremiereVersionConverter.scala [119:178]


  def runVersionScanner(projectTypeId:Int) = {
    val xtractor = new ExtractPremiereVersion

    ProjectEntry
      .scanProjectsForType(projectTypeId) //step 1 - stream out projects
      .mapAsync(2)(p=>p.associatedFiles(allVersions=false).map(files=>(p, files)))  //step 2 - now pick up the file associations for each of them. allVersions = false => only default storage.
      .mapAsync(1)(content=>{ //step 3 - get the full filepath for each of the listed files and add it into the data stream
        val project = content._1
        val files = content._2
        Future
          .sequence(files.map(f=>fileEntryDAO.getFullPath(f)))
          .map(filePaths=>(project, filePaths zip files))
      })
      .mapAsync(1)(content=>{ //step 4 - now comes the real action....
        val project = content._1
        val filePaths = content._2
        for {
          //run the ExtractPremiereVersion postrun across all of the listed files.
          //map the output into a tuple list consisting of the postrun output (success/failure) and the incoming data
          extractedInfo <- Future.sequence(filePaths.map(filePair =>
            xtractor.postrun(filePair._1, project, null, PostrunDataCache(), None, None)
          )).map(_ zip filePaths)
          //now iterate through those results.
          // If the postrun ran successfully, pick up the version and output and updated version of the FileEntry.
          // If the postrun ran successfully but did not output a version, abort; this should not happen and indicates a bug.
          // If it did not run successfully then log a warning but continue
          results <- Future(extractedInfo.map(info=>info._1 match {
            case Success(updatedCache)=>
              updatedCache.get("premiere_version") match {
                case Some(updatedVersion)=>
                  logger.info(s"${info._2._1}: Internal premiere version number is $updatedVersion")
                  Some(info._2._2.copy(maybePremiereVersion = Some(updatedVersion.toInt)))
                case None=>
                  throw new RuntimeException(s"${info._2._1}: ExtractPremiereVersion completed successfully but did not return any info, this should not happen")
              }
            case Failure(err)=>
              logger.warn(s"Could not get premiere version from ${info._2._1}: ${err.getClass.getCanonicalName} ${err.getMessage}", err)
              None
          }))
        } yield results
      })
      .map(_.collect({case Some(fileEntry)=>fileEntry}))  //step 5 - filter out unsuccessful attempts
      .mapAsync(4)(files=>{ //step 6 - write the modified records back to the database
        Future.sequence(
          files.map(f=>{
            logger.info(s"Saving updated record for ${f.id} ${f.filepath}")
            fileEntryDAO.saveSimple(f)
          })
        )
      })
      .toMat(Sink.ignore)(Keep.right)
      .run()
      .map(_=>{
        logger.info("Premiere version scan is now complete")
      })
      .recover({
        case err:Throwable=>
          logger.error(s"Premiere version scan failed: ${err.getClass.getCanonicalName} ${err.getMessage}", err)
      })
  }