app/services/actors/creation/PostrunExecutor.scala (187 lines of code) (raw):

package services.actors.creation import javax.inject.{Inject, Named} import akka.actor.ActorRef import helpers.{JythonOutput, PostrunDataCache} import models._ import models.messages.{NewAdobeUuid, NewAssetFolder} import org.slf4j.MDC import play.api.Configuration import play.api.db.slick.DatabaseConfigProvider import slick.jdbc.JdbcProfile import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext.Implicits.global class PostrunExecutor @Inject() (dbConfigProvider:DatabaseConfigProvider, config:Configuration)(implicit fileEntryDAO: FileEntryDAO) extends GenericCreationActor { override val persistenceId = "postrun-executor-actor-" + self.path.name implicit val timeout:Duration = Duration(config.getOptional[String]("postrun.timeout").getOrElse("30 seconds")) import GenericCreationActor._ private implicit val db=dbConfigProvider.get[JdbcProfile].db protected def syncExecScript(action: PostrunAction, projectFileName: String, entry: ProjectEntry, projectType: ProjectType, cache: PostrunDataCache, workingGroupMaybe: Option[PlutoWorkingGroup], commissionMaybe: Option[PlutoCommission]) (implicit db: slick.jdbc.PostgresProfile#Backend#Database, config:Configuration, timeout: Duration):Try[JythonOutput] = Try { Await.result(action.run(projectFileName,entry,projectType,cache, workingGroupMaybe, commissionMaybe), timeout) }.flatten /** * Recursively iterates a list of postrun actions, running each * @param actions list of actions to run * @param results accumulator for results. Initially call this with an empty Seq() * @param cache PostrunDataCache instance for passing data between postruns. Initially call this with PostrunDataCache(). * @param projectFileName file name of the created project * @param projectEntry entry of the created project * @param projectType type of the created project * @param db implicitly passed database object * @param config implicitly passed Play framework configuration * @return ultimate sequence of results */ def runNextAction(actions: Seq[PostrunAction], results:Seq[Try[JythonOutput]], cache: PostrunDataCache, projectFileName: String, projectEntry: ProjectEntry, projectType: ProjectType, workingGroupMaybe:Option[PlutoWorkingGroup],commissionMaybe:Option[PlutoCommission]) (implicit db: slick.jdbc.PostgresProfile#Backend#Database, config:play.api.Configuration, timeout: Duration):Seq[Try[JythonOutput]] = { logger.debug(s"runNextAction: remaining actions: ${actions.toString()}") actions.headOption match { case Some(nextAction)=> //run next action logger.debug(s"running action ${nextAction.toString}") val scriptResult = syncExecScript(nextAction, projectFileName,projectEntry, projectType, cache, workingGroupMaybe, commissionMaybe) val newResults = results ++ Seq(scriptResult) logger.debug(s"got results: ${newResults.toString()}") scriptResult match { case Success(output)=> output.raisedError match { case None=> // no error raised runNextAction(actions.tail, newResults, output.newDataCache, projectFileName, projectEntry, projectType, workingGroupMaybe, commissionMaybe) case Some(scriptError)=> //script ran but failed logger.error(s"Postrun script ${nextAction.runnable} failed: ", scriptError) logger.error("Aborting postruns due to failure") newResults } case Failure(error)=> logger.error(s"Could not start postrun script ${nextAction.runnable}.") logger.error("Aborting postruns due to failure") newResults } case None=> //no more actions left to run logger.debug("recursion ends") results } } /** * recursively searches the result list for the first value of the given key in the datastore * @param reversedResults a Sequence of JythonOutput. Normally reverse this, to get the final value of a key rather than the first. * @return an Option which contains the value, if there is one. */ def locateCacheValue(reversedResults:Seq[JythonOutput],key:String):Option[String] = { if(reversedResults.isEmpty) return None val scalaMap = reversedResults.head.newDataCache.asScala if(scalaMap.contains(key)){ Some(scalaMap(key)) } else { locateCacheValue(reversedResults.tail,key) } } /** * Traverses a sequence of a Try of type A and returns either a Right with all results if they all succeeded or a Left * with all of the errors if any failed. * https://stackoverflow.com/questions/15495678/flatten-scala-try * @param xs - sequence to traverse * @tparam A - type of sequence xs * @return either Left containing a sequence of Throwable or Right containing sequence of A */ protected def collectFailures[A](xs:Seq[Try[A]]):Either[Seq[Throwable],Seq[A]] = Try(Right(xs.map(_.get))).getOrElse(Left(xs.collect({case Failure(err)=>err}))) protected def persistMetadataToDatabase(maybeOutput: Option[JythonOutput], createdProjectEntry:ProjectEntry, successfulActions:Int, writtenPath: String): Future[Either[String, String]] = maybeOutput match { case Some(finalResult)=> if(createdProjectEntry.id.isEmpty) throw new RuntimeException("Created project without id?") val mdSetFuture = ProjectMetadata.setBulk(createdProjectEntry.id.get,finalResult.newDataCache.asScala) mdSetFuture.map({ case Success(count)=> logger.info(s"Set $count metadata fields for project ID ${createdProjectEntry.id}") Right(s"Successfully ran $successfulActions postrun actions for project $writtenPath") case Failure(err)=> logger.error("Could not set metadata for project", err) Left(err.toString) }) case None=> logger.warn(s"No postruns ran for ${createdProjectEntry.projectTitle} (${createdProjectEntry.id.get} so no metadata") Future(Right(s"Successfully ran $successfulActions postrun actions for project $writtenPath")) } /** * If the postruns extracted a premiere version number then store it against the file entry for the project * @param maybeOutput final postrun output with the final state of the data store * @param entry ProjectEntry that was created. This must have the saved project entry on it. * @return a Future, containing a sequence of updated FileEntry results */ protected def storePremiereVersionInfo(maybeOutput: Option[JythonOutput], entry: ProjectEntry) = maybeOutput match { case Some(finalResult)=> finalResult.newDataCache.get("premiere_version") match { case Some(premiereVersion) => Try { premiereVersion.toInt } match { case Success(premiereVersionNumber) => logger.info(s"${entry.id}: ${entry.projectTitle} - got premiere version $premiereVersion") for { files <- entry.associatedFiles(allVersions = false) updates <- Future.sequence(files.map(f => { val updatedFile = f.copy(maybePremiereVersion = Some(premiereVersionNumber)) updatedFile.save })) } yield updates case Failure(err) => logger.error(s"${entry.id}: ${entry.projectTitle} - got premiere version '$premiereVersion' which does not convert to a number. This should not happen. Error was '${err.getMessage}'", err) Future.failed(new RuntimeException("Could not get a valid premiere version, possibly a corrupted template")) } case None => logger.debug(s"${entry.projectTitle} (${entry.id.get} is not a premiere project") Future(Seq()) } case None=> logger.warn(s"No postruns ran for ${entry.projectTitle} (${entry.id.get} so no metadata") Future(Seq()) } override def receive: Receive = { case createRequest:NewProjectRequest=> //FIXME: should validate createRequest here, before entering persistence block doPersistedAsync(createRequest) { (msg, originalSender) => implicit val configImplicit = config val fileEntry = createRequest.data.destFileEntry.get MDC.put("fileEntry", fileEntry.toString) val createdProjectEntry = createRequest.data.createdProjectEntry.get MDC.put("createdProjectEntry", createdProjectEntry.toString) try { val futureSequence = Future.sequence(Seq( createRequest.rq.projectTemplate.projectType, fileEntry.getFullPath, createdProjectEntry.getWorkingGroup, createdProjectEntry.getCommission )) futureSequence.map({ resultSeq => val projectType = resultSeq.head.asInstanceOf[ProjectType] MDC.put("projectType", projectType.toString) val writtenPath = resultSeq(1).asInstanceOf[String] MDC.put("writtenPath", writtenPath) val workingGroupMaybe = resultSeq(2).asInstanceOf[Option[PlutoWorkingGroup]] MDC.put("workingGroupMaybe", workingGroupMaybe.toString) val commissionMaybe = resultSeq(3).asInstanceOf[Option[PlutoCommission]] MDC.put("commissionMaybe", commissionMaybe.toString) val sortedActions = createRequest.data.postrunSequence.get val actionResults: Seq[Try[JythonOutput]] = runNextAction(sortedActions, Seq(), PostrunDataCache(), writtenPath, createdProjectEntry, projectType, workingGroupMaybe, commissionMaybe) val actionSuccess = collectFailures[JythonOutput](actionResults) actionSuccess match { case Left(errorSeq) => MDC.put("errorSeq", errorSeq.toString()) val msg = s"${errorSeq.length} postrun actions failed for project $writtenPath, see log for details" val ex = new RuntimeException(msg) logger.error(msg) errorSeq.foreach(err => logger.error(s"\tMethod failed with:", err)) originalSender ! StepFailed(createRequest.data, ex) Failure(ex) case Right(results) => val scriptErrors = results.filter(_.raisedError.isDefined) if (scriptErrors.nonEmpty) { val ex = new RuntimeException(s"${scriptErrors.length} out of ${results.length} postrun scripts failed for project $writtenPath") originalSender ! StepFailed(createRequest.data, ex) Failure(ex) } else { val msg = s"Successfully ran ${results.length} postrun actions for project $writtenPath" logger.info(s"Successfully ran ${results.length} postrun actions for project $writtenPath") val reversedResults = results.reverse val result = for { _ <- persistMetadataToDatabase(reversedResults.headOption, createdProjectEntry, results.length, writtenPath) result <- storePremiereVersionInfo(reversedResults.headOption, createdProjectEntry) } yield result result.onComplete({ case Success(_)=> originalSender ! StepSucceded(createRequest.data) case Failure(err)=> logger.error(s"Could not persist metadata: ${err.getMessage}", err) originalSender ! StepFailed(createRequest.data, err) }) Success(msg) } } }) } catch { case ex:Throwable=> originalSender ! StepFailed(createRequest.data, ex) Future(Success(ex.toString)) //don't replay the transaction if it failed this way } } case rollbackRequest:NewProjectRollback=> logger.debug("No rollback necessary for this step") sender() ! StepSucceded(rollbackRequest.data) case _=> super.receive } }