def deleteDataRunner()

in app/controllers/ProjectEntryController.scala [654:1004]


  def deleteDataRunner(projectId: Int, delay: Int, pluto: Boolean, file: Boolean, backups: Boolean, pTR: Boolean, deliverables: Boolean, sAN: Boolean, matrix: Boolean, s3: Boolean, buckets: Array[String], bucketBooleans: Array[Boolean]): Unit = {
    def deleteFileJob() = Future {
      if (file) {
        implicit val db = dbConfig.db
        ProjectEntry.entryForId(projectId).map({
          case Success(projectEntry: ProjectEntry) =>
            projectEntry.associatedFiles(false).map(fileList => {
              fileList.map(entry => {
                logger.info(s"Attempting to delete the file at: ${entry.filepath}")
                fileEntryDAO
                  .deleteFromDisk(entry)
                  .andThen(_ => fileEntryDAO.deleteRecord(entry))
                if(entry.filepath.endsWith(".cpr")) {
                  db.run(
                    TableQuery[ProjectMetadataRow]
                      .filter(_.key===ProjectMetadata.ASSET_FOLDER_KEY)
                      .filter(_.projectRef===projectId)
                      .result
                  ).map(results=>{
                    val resultCount = results.length
                    if(resultCount==0){
                      logger.info(s"No asset folder registered for that project id.")
                    } else {
                      logger.info(s"Found the asset folder at: ${results.head.value.get} Attempting to delete any Cubase files present." )
                      for {
                        files <- Option(new File(results.head.value.get).listFiles)
                        file <- files if file.getName.endsWith(".cpr")
                      } file.delete()
                    }
                  }).recover({
                    case err: Throwable =>
                      logger.error(s"Could not look up asset folder for project id $projectId: ", err)
                  })
                }
                })
              }
            )
          case Failure(error) =>
            logger.error(s"Could not look up project entry for ${projectId}: ", error)
        })
      }
    }

    def deleteBackupsJob() = Future {
      if (backups) {
        implicit val db = dbConfig.db
        ProjectEntry.entryForId(projectId).map({
          case Success(projectEntry: ProjectEntry) =>
            logger.info(s"About to attempt to delete any backups present for project ${projectId}")
            projectEntry.associatedFiles(true).map(fileList => {
              fileList.map(entry => {
                entry.backupOf match {
                  case Some(value) =>
                    logger.info(s"Attempting to delete the file at: ${entry.filepath}")
                    fileEntryDAO
                      .deleteFromDisk(entry)
                      .andThen(_ => fileEntryDAO.deleteRecord(entry))
                  case None =>
                    logger.info(s"Ignoring non-backup file at ${entry.filepath}")
                }
              })
            }
            )
            projectEntry.associatedAssetFolderFiles(true, implicitConfig).map(fileList => {
              fileList.map(entry => {
                if (entry.storageId == config.get[Int]("asset_folder_backup_storage")) {
                  logger.info(s"Attempting to delete the file at: ${entry.filepath}")
                  assetFolderFileEntryDAO
                    .deleteFromDisk(entry)
                    .andThen(_ => assetFolderFileEntryDAO.deleteRecord(entry))
                } else {
                  logger.info(s"Ignoring non-backup file at ${entry.filepath}")
                }
              })
            }
            )
          case Failure(error) =>
            logger.error(s"Could not look up project entry for ${projectId}: ", error)
        })
      }
    }

    val xtensionXtractor="^(.*)\\.([^.]+)$".r

    def removeProjectFileExtension(projectFileName:String) = projectFileName match {
      case xtensionXtractor(barePath,_)=>barePath
      case _=>
        logger.warn(s"The project file '$projectFileName' does not appear to have a file extension")
        projectFileName
    }

    def deletePTRJob() = Future {
      if (pTR) {
        implicit val db = dbConfig.db
        ProjectEntry.entryForId(projectId).map({
          case Success(projectEntry: ProjectEntry) =>
            projectEntry.associatedFiles(false).map(fileList => {
              fileList.map(entry => {
                fileEntryDAO
                  .storage(entry)
                  .andThen({
                    case Success(storageTry) =>
                      storageTry match {
                        case Some(storage) =>
                          val targetFilePath = storage.rootpath.get + "/" + removeProjectFileExtension(entry.filepath) + ".ptr"
                          logger.info(s"Attempting to delete a possible file at: ${targetFilePath}")
                          new File(targetFilePath).delete()
                        case None =>
                          logger.info(s"Attempt at loading storage data failed.")
                      }
                    case Failure(err)=>
                      logger.error(s"Attempt at loading storage data failed.", err)
                  })
              })
            }
            )
          case Failure(error) =>
            logger.error(s"Could not look up project entry for ${projectId}: ", error)
        })
      }
    }

    def deleteDeliverables() = Future {
      if (deliverables) {
        rabbitMqDeliverable ! DeliverableEvent(projectId)
      }
    }

    def deleteS3() = Future {
      if (s3) {
        for((bucket,i) <- buckets.view.zipWithIndex) {
          if (bucketBooleans(i)) {
            val assetFolderString = Await.result(assetFolderForProject(projectId), Duration.Inf).toString
            logger.info(s"Asset folder for project: $assetFolderString")
            if (assetFolderString == "") {
              logger.warn(s"No asset folder found for project. Can not attempt to delete data from S3.")
            } else {
              implicit lazy val s3helper: S3Helper = helpers.S3Helper.createFromBucketName(bucket).toOption.get
              val assetFolderBasePath = config.get[String]("postrun.assetFolder.basePath")
              val keyForSearch = assetFolderString.replace(s"$assetFolderBasePath/", "")
              if (!keyForSearch.matches(".*?\\/.*?\\/.*?\\_.*?")) {
                logger.warn(s"Key for search does not match the expected format. Can not attempt to delete data from S3.")
              } else {
                logger.info(s"About to attempt to delete any data in the S3 bucket: ${bucket}")
                val bucketObjectData = s3helper.listBucketObjects(keyForSearch)
                for (s3Object <- bucketObjectData) {
                  if (s"$keyForSearch/" != s3Object.key) {
                    logger.info(s"Found S3 key: ${s3Object.key}")
                    val objectVersions = s3helper.listObjectsVersions(s3Object)
                    for (version <- objectVersions) {
                      logger.info(s"Found version: ${version.versionId()} for key: ${version.key()}")
                      val deleteOutcome = s3helper.deleteObject(s3Object, version.versionId())
                      logger.info(s"Delete response was: $deleteOutcome")
                    }
                  }
                }
                val bucketObjectDataFolder = s3helper.listBucketObjects(keyForSearch)
                for (s3Object <- bucketObjectDataFolder) {
                  if (s"$keyForSearch/" == s3Object.key) {
                    logger.info(s"Found S3 key: ${s3Object.key}")
                    val objectVersionsFolder = s3helper.listObjectsVersions(s3Object)
                    for (version <- objectVersionsFolder) {
                      logger.info(s"Found version: ${version.versionId()} for key: ${version.key()}")
                      val deleteOutcomeFolder = s3helper.deleteObject(s3Object, version.versionId())
                      logger.info(s"Delete response was: $deleteOutcomeFolder")
                    }
                  }
                }
              }
            }
          }
        }
      }
    }

    def onlineFilesByProject(vidispineCommunicator: VidispineCommunicator, projectId: Int): Future[Seq[OnlineOutputMessage]] = {
      vidispineCommunicator.getFilesOfProject(projectId)
        .map(_.filterNot(isBranding).map(InternalOnlineOutputMessage.toOnlineOutputMessage))
    }

    def isBranding(item: VSOnlineOutputMessage): Boolean = item.mediaCategory.toLowerCase match {
      case "branding" => true // Case insensitive
      case _ => false
    }

    def deleteSAN() = Future {
      if (sAN) {
        Thread.sleep(delay)
        logger.info(s"About to attempt to delete any SAN data present for project ${projectId}")
        implicit val db = dbConfig.db
        DeleteJobDAO.getOrCreate(projectId, "Started")
        lazy val vidispineConfig = VidispineConfig.fromEnvironment.toOption.get
        implicit lazy val executionContext = new MdcExecutionContext(
          ExecutionContext.fromExecutor(
            Executors.newWorkStealingPool(10)
          )
        )
        implicit lazy val actorSystem:ActorSystem = ActorSystem("pluto-core-delete", defaultExecutionContext=Some(executionContext))
        implicit lazy val mat:Materializer = Materializer(actorSystem)
        implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig)
        val vidispineMethodOut = Await.result(onlineFilesByProject(vidispineCommunicator, projectId), 120.seconds)
        vidispineMethodOut.map(onlineOutputMessage => {
          if (onlineOutputMessage.projectIds.length > 2) {
            logger.info(s"Refusing to attempt to delete Vidispine item ${onlineOutputMessage.vidispineItemId.get} as it is referenced by more than one project.")
            ItemDeleteDataDAO.getOrCreate(projectId, onlineOutputMessage.vidispineItemId.get)
          } else {
            logger.info(s"About to attempt to send a message to delete Vidispine item ${onlineOutputMessage.vidispineItemId.get}")
            rabbitMqSAN ! SANEvent(onlineOutputMessage)
          }
        })
        Thread.sleep(1000)
        DeleteJobDAO.getOrCreate(projectId, "Finished")
      }
    }

    def nearlineFilesByProject(vault: Vault, projectId: String): Future[Seq[OnlineOutputMessage]] = {
      val sinkFactory = Sink.seq[OnlineOutputMessage]
      Source.fromGraph(new OMFastContentSearchSource(vault,
        s"""GNM_PROJECT_ID:\"$projectId\"""",
        Array("MXFS_PATH", "MXFS_FILENAME", "GNM_PROJECT_ID", "GNM_TYPE", "__mxs__length")
      )
      ).filterNot(isBrandingMatrix)
        .map(InternalOnlineOutputMessage.toOnlineOutputMessage)
        .toMat(sinkFactory)(Keep.right)
        .run()
    }

    def isBrandingMatrix(entry: ObjectMatrixEntry): Boolean = entry.stringAttribute("GNM_TYPE") match {
      case Some(gnmType) =>
        gnmType.toLowerCase match {
          case "branding" => true // Case insensitive
          case _ => false
        }
      case _ => false
    }

    def searchAssociatedNearlineMedia(projectId: Int, vault: Vault): Future[Seq[OnlineOutputMessage]] = {
      nearlineFilesByProject(vault, projectId.toString)
    }

    def getNearlineResults(projectId: Int, nearlineVaultId: String, matrixStore: MXSConnectionBuilderImpl): Future[Either[String, Seq[OnlineOutputMessage]]] =
      matrixStore.withVaultFuture(nearlineVaultId) { vault =>
        searchAssociatedNearlineMedia(projectId, vault).map(Right.apply)
      }

    def deleteMatrix() = Future {
      if (matrix) {
        Thread.sleep(delay)
        logger.info(s"About to attempt to delete any Object Matrix data present for project ${projectId}")
        implicit val db = dbConfig.db
        MatrixDeleteJobDAO.getOrCreate(projectId, "Started")
        lazy val matrixStoreConfig = new MatrixStoreEnvironmentConfigProvider().get() match {
          case Left(err)=>
            logger.error(s"Could not initialise due to incorrect matrix-store config: $err")
            sys.exit(1)
          case Right(config)=>config
        }
        implicit lazy val executionContext = new MdcExecutionContext(
          ExecutionContext.fromExecutor(
            Executors.newWorkStealingPool(10)
          )
        )
        implicit lazy val actorSystem:ActorSystem = ActorSystem("pluto-core-delete-matrix", defaultExecutionContext=Some(executionContext))
        implicit lazy val mat:Materializer = Materializer(actorSystem)
        val connectionIdleTime = sys.env.getOrElse("CONNECTION_MAX_IDLE", "750").toInt
        implicit val matrixStore = new MXSConnectionBuilderImpl(
          hosts = matrixStoreConfig.hosts,
          accessKeyId = matrixStoreConfig.accessKeyId,
          accessKeySecret = matrixStoreConfig.accessKeySecret,
          clusterId = matrixStoreConfig.clusterId,
          maxIdleSeconds = connectionIdleTime
        )
        val matrixMethodOut = Await.result(getNearlineResults(projectId, matrixStoreConfig.nearlineVaultId, matrixStore), 120.seconds)
        matrixMethodOut match {
          case Right(nearlineResults) =>
            nearlineResults.map(onlineOutputMessage => {
              if (onlineOutputMessage.projectIds.length > 2) {
                logger.info(s"Refusing to attempt to delete Object Matrix data for object ${onlineOutputMessage.nearlineId.get} as it is referenced by more than one project.")
                MatrixDeleteDataDAO.getOrCreate(projectId, onlineOutputMessage.nearlineId.get)
              } else {
                logger.info(s"About to attempt to send a message to delete Object Matrix data for object ${onlineOutputMessage.nearlineId.get}")
                rabbitMqMatrix ! MatrixEvent(onlineOutputMessage)
              }
            })
          case Left(something) =>
            logger.info(s"No Object Matrix data was found to process.")
        }
        MatrixDeleteJobDAO.getOrCreate(projectId, "Finished")
      }
    }

    def makeDeletionRecord() = Future {
      implicit val db = dbConfig.db

      var user = ""
      val currentDate = new Date()
      val timestampOfNow = new Timestamp(currentDate.getTime)
      var created = timestampOfNow
      var workingGroupName = ""

      ProjectEntry.entryForId(projectId).map({
        case Success(projectEntry: ProjectEntry) =>
          user = projectEntry.user
          created = projectEntry.created
          projectEntry.getWorkingGroup.map({
            case Some(workingGroup: PlutoWorkingGroup) =>
              workingGroupName = workingGroup.name
              DeletionRecordDAO.getOrCreate(projectId, user, timestampOfNow, created, workingGroupName)
            case None =>
              logger.error(s"Could not get working group name for project ${projectId}")
              DeletionRecordDAO.getOrCreate(projectId, user, timestampOfNow, created, "Unknown")
          })
        case Failure(error) =>
          logger.error(s"Could not look up project entry for ${projectId}: ", error)
          Left(error.toString)
      })
    }

    val f = for {
      f1 <- makeDeletionRecord()
      f2 <- deletePTRJob()
      f3 <- deleteFileJob()
      f4 <- deleteBackupsJob()
      f5 <- deleteDeliverables()
      f6 <- deleteS3()
      f7 <- deleteMatrix()
      f8 <- deleteSAN()
    } yield List(f1, f2, f3, f4, f5, f6, f7, f8)
    if (pluto) {
      Thread.sleep(800)
      implicit val db = dbConfig.db
      ProjectMetadata.deleteAllMetadataFor(projectId).map({
        case Success(rows) =>
          logger.info(s"Attempt at removing project metadata worked.")
        case Failure(err) =>
          logger.error(s"Could not delete metadata", err)
      })
      ProjectEntry.entryForId(projectId).map({
        case Success(projectEntry: ProjectEntry) =>
          projectEntry.removeFromDatabase.map({
            case Success(_) =>
              logger.info(s"Attempt at removing project record worked.")
            case Failure(error) =>
              logger.error(s"Attempt at removing project record failed with error: ${error}")
          })
        case Failure(error) =>
          logger.error(s"Could not look up project entry for ${projectId}: ", error)
          Left(error.toString)
      })
    }
  }