in app/controllers/ProjectEntryController.scala [654:1004]
def deleteDataRunner(projectId: Int, delay: Int, pluto: Boolean, file: Boolean, backups: Boolean, pTR: Boolean, deliverables: Boolean, sAN: Boolean, matrix: Boolean, s3: Boolean, buckets: Array[String], bucketBooleans: Array[Boolean]): Unit = {
def deleteFileJob() = Future {
if (file) {
implicit val db = dbConfig.db
ProjectEntry.entryForId(projectId).map({
case Success(projectEntry: ProjectEntry) =>
projectEntry.associatedFiles(false).map(fileList => {
fileList.map(entry => {
logger.info(s"Attempting to delete the file at: ${entry.filepath}")
fileEntryDAO
.deleteFromDisk(entry)
.andThen(_ => fileEntryDAO.deleteRecord(entry))
if(entry.filepath.endsWith(".cpr")) {
db.run(
TableQuery[ProjectMetadataRow]
.filter(_.key===ProjectMetadata.ASSET_FOLDER_KEY)
.filter(_.projectRef===projectId)
.result
).map(results=>{
val resultCount = results.length
if(resultCount==0){
logger.info(s"No asset folder registered for that project id.")
} else {
logger.info(s"Found the asset folder at: ${results.head.value.get} Attempting to delete any Cubase files present." )
for {
files <- Option(new File(results.head.value.get).listFiles)
file <- files if file.getName.endsWith(".cpr")
} file.delete()
}
}).recover({
case err: Throwable =>
logger.error(s"Could not look up asset folder for project id $projectId: ", err)
})
}
})
}
)
case Failure(error) =>
logger.error(s"Could not look up project entry for ${projectId}: ", error)
})
}
}
def deleteBackupsJob() = Future {
if (backups) {
implicit val db = dbConfig.db
ProjectEntry.entryForId(projectId).map({
case Success(projectEntry: ProjectEntry) =>
logger.info(s"About to attempt to delete any backups present for project ${projectId}")
projectEntry.associatedFiles(true).map(fileList => {
fileList.map(entry => {
entry.backupOf match {
case Some(value) =>
logger.info(s"Attempting to delete the file at: ${entry.filepath}")
fileEntryDAO
.deleteFromDisk(entry)
.andThen(_ => fileEntryDAO.deleteRecord(entry))
case None =>
logger.info(s"Ignoring non-backup file at ${entry.filepath}")
}
})
}
)
projectEntry.associatedAssetFolderFiles(true, implicitConfig).map(fileList => {
fileList.map(entry => {
if (entry.storageId == config.get[Int]("asset_folder_backup_storage")) {
logger.info(s"Attempting to delete the file at: ${entry.filepath}")
assetFolderFileEntryDAO
.deleteFromDisk(entry)
.andThen(_ => assetFolderFileEntryDAO.deleteRecord(entry))
} else {
logger.info(s"Ignoring non-backup file at ${entry.filepath}")
}
})
}
)
case Failure(error) =>
logger.error(s"Could not look up project entry for ${projectId}: ", error)
})
}
}
val xtensionXtractor="^(.*)\\.([^.]+)$".r
def removeProjectFileExtension(projectFileName:String) = projectFileName match {
case xtensionXtractor(barePath,_)=>barePath
case _=>
logger.warn(s"The project file '$projectFileName' does not appear to have a file extension")
projectFileName
}
def deletePTRJob() = Future {
if (pTR) {
implicit val db = dbConfig.db
ProjectEntry.entryForId(projectId).map({
case Success(projectEntry: ProjectEntry) =>
projectEntry.associatedFiles(false).map(fileList => {
fileList.map(entry => {
fileEntryDAO
.storage(entry)
.andThen({
case Success(storageTry) =>
storageTry match {
case Some(storage) =>
val targetFilePath = storage.rootpath.get + "/" + removeProjectFileExtension(entry.filepath) + ".ptr"
logger.info(s"Attempting to delete a possible file at: ${targetFilePath}")
new File(targetFilePath).delete()
case None =>
logger.info(s"Attempt at loading storage data failed.")
}
case Failure(err)=>
logger.error(s"Attempt at loading storage data failed.", err)
})
})
}
)
case Failure(error) =>
logger.error(s"Could not look up project entry for ${projectId}: ", error)
})
}
}
def deleteDeliverables() = Future {
if (deliverables) {
rabbitMqDeliverable ! DeliverableEvent(projectId)
}
}
def deleteS3() = Future {
if (s3) {
for((bucket,i) <- buckets.view.zipWithIndex) {
if (bucketBooleans(i)) {
val assetFolderString = Await.result(assetFolderForProject(projectId), Duration.Inf).toString
logger.info(s"Asset folder for project: $assetFolderString")
if (assetFolderString == "") {
logger.warn(s"No asset folder found for project. Can not attempt to delete data from S3.")
} else {
implicit lazy val s3helper: S3Helper = helpers.S3Helper.createFromBucketName(bucket).toOption.get
val assetFolderBasePath = config.get[String]("postrun.assetFolder.basePath")
val keyForSearch = assetFolderString.replace(s"$assetFolderBasePath/", "")
if (!keyForSearch.matches(".*?\\/.*?\\/.*?\\_.*?")) {
logger.warn(s"Key for search does not match the expected format. Can not attempt to delete data from S3.")
} else {
logger.info(s"About to attempt to delete any data in the S3 bucket: ${bucket}")
val bucketObjectData = s3helper.listBucketObjects(keyForSearch)
for (s3Object <- bucketObjectData) {
if (s"$keyForSearch/" != s3Object.key) {
logger.info(s"Found S3 key: ${s3Object.key}")
val objectVersions = s3helper.listObjectsVersions(s3Object)
for (version <- objectVersions) {
logger.info(s"Found version: ${version.versionId()} for key: ${version.key()}")
val deleteOutcome = s3helper.deleteObject(s3Object, version.versionId())
logger.info(s"Delete response was: $deleteOutcome")
}
}
}
val bucketObjectDataFolder = s3helper.listBucketObjects(keyForSearch)
for (s3Object <- bucketObjectDataFolder) {
if (s"$keyForSearch/" == s3Object.key) {
logger.info(s"Found S3 key: ${s3Object.key}")
val objectVersionsFolder = s3helper.listObjectsVersions(s3Object)
for (version <- objectVersionsFolder) {
logger.info(s"Found version: ${version.versionId()} for key: ${version.key()}")
val deleteOutcomeFolder = s3helper.deleteObject(s3Object, version.versionId())
logger.info(s"Delete response was: $deleteOutcomeFolder")
}
}
}
}
}
}
}
}
}
def onlineFilesByProject(vidispineCommunicator: VidispineCommunicator, projectId: Int): Future[Seq[OnlineOutputMessage]] = {
vidispineCommunicator.getFilesOfProject(projectId)
.map(_.filterNot(isBranding).map(InternalOnlineOutputMessage.toOnlineOutputMessage))
}
def isBranding(item: VSOnlineOutputMessage): Boolean = item.mediaCategory.toLowerCase match {
case "branding" => true // Case insensitive
case _ => false
}
def deleteSAN() = Future {
if (sAN) {
Thread.sleep(delay)
logger.info(s"About to attempt to delete any SAN data present for project ${projectId}")
implicit val db = dbConfig.db
DeleteJobDAO.getOrCreate(projectId, "Started")
lazy val vidispineConfig = VidispineConfig.fromEnvironment.toOption.get
implicit lazy val executionContext = new MdcExecutionContext(
ExecutionContext.fromExecutor(
Executors.newWorkStealingPool(10)
)
)
implicit lazy val actorSystem:ActorSystem = ActorSystem("pluto-core-delete", defaultExecutionContext=Some(executionContext))
implicit lazy val mat:Materializer = Materializer(actorSystem)
implicit lazy val vidispineCommunicator = new VidispineCommunicator(vidispineConfig)
val vidispineMethodOut = Await.result(onlineFilesByProject(vidispineCommunicator, projectId), 120.seconds)
vidispineMethodOut.map(onlineOutputMessage => {
if (onlineOutputMessage.projectIds.length > 2) {
logger.info(s"Refusing to attempt to delete Vidispine item ${onlineOutputMessage.vidispineItemId.get} as it is referenced by more than one project.")
ItemDeleteDataDAO.getOrCreate(projectId, onlineOutputMessage.vidispineItemId.get)
} else {
logger.info(s"About to attempt to send a message to delete Vidispine item ${onlineOutputMessage.vidispineItemId.get}")
rabbitMqSAN ! SANEvent(onlineOutputMessage)
}
})
Thread.sleep(1000)
DeleteJobDAO.getOrCreate(projectId, "Finished")
}
}
def nearlineFilesByProject(vault: Vault, projectId: String): Future[Seq[OnlineOutputMessage]] = {
val sinkFactory = Sink.seq[OnlineOutputMessage]
Source.fromGraph(new OMFastContentSearchSource(vault,
s"""GNM_PROJECT_ID:\"$projectId\"""",
Array("MXFS_PATH", "MXFS_FILENAME", "GNM_PROJECT_ID", "GNM_TYPE", "__mxs__length")
)
).filterNot(isBrandingMatrix)
.map(InternalOnlineOutputMessage.toOnlineOutputMessage)
.toMat(sinkFactory)(Keep.right)
.run()
}
def isBrandingMatrix(entry: ObjectMatrixEntry): Boolean = entry.stringAttribute("GNM_TYPE") match {
case Some(gnmType) =>
gnmType.toLowerCase match {
case "branding" => true // Case insensitive
case _ => false
}
case _ => false
}
def searchAssociatedNearlineMedia(projectId: Int, vault: Vault): Future[Seq[OnlineOutputMessage]] = {
nearlineFilesByProject(vault, projectId.toString)
}
def getNearlineResults(projectId: Int, nearlineVaultId: String, matrixStore: MXSConnectionBuilderImpl): Future[Either[String, Seq[OnlineOutputMessage]]] =
matrixStore.withVaultFuture(nearlineVaultId) { vault =>
searchAssociatedNearlineMedia(projectId, vault).map(Right.apply)
}
def deleteMatrix() = Future {
if (matrix) {
Thread.sleep(delay)
logger.info(s"About to attempt to delete any Object Matrix data present for project ${projectId}")
implicit val db = dbConfig.db
MatrixDeleteJobDAO.getOrCreate(projectId, "Started")
lazy val matrixStoreConfig = new MatrixStoreEnvironmentConfigProvider().get() match {
case Left(err)=>
logger.error(s"Could not initialise due to incorrect matrix-store config: $err")
sys.exit(1)
case Right(config)=>config
}
implicit lazy val executionContext = new MdcExecutionContext(
ExecutionContext.fromExecutor(
Executors.newWorkStealingPool(10)
)
)
implicit lazy val actorSystem:ActorSystem = ActorSystem("pluto-core-delete-matrix", defaultExecutionContext=Some(executionContext))
implicit lazy val mat:Materializer = Materializer(actorSystem)
val connectionIdleTime = sys.env.getOrElse("CONNECTION_MAX_IDLE", "750").toInt
implicit val matrixStore = new MXSConnectionBuilderImpl(
hosts = matrixStoreConfig.hosts,
accessKeyId = matrixStoreConfig.accessKeyId,
accessKeySecret = matrixStoreConfig.accessKeySecret,
clusterId = matrixStoreConfig.clusterId,
maxIdleSeconds = connectionIdleTime
)
val matrixMethodOut = Await.result(getNearlineResults(projectId, matrixStoreConfig.nearlineVaultId, matrixStore), 120.seconds)
matrixMethodOut match {
case Right(nearlineResults) =>
nearlineResults.map(onlineOutputMessage => {
if (onlineOutputMessage.projectIds.length > 2) {
logger.info(s"Refusing to attempt to delete Object Matrix data for object ${onlineOutputMessage.nearlineId.get} as it is referenced by more than one project.")
MatrixDeleteDataDAO.getOrCreate(projectId, onlineOutputMessage.nearlineId.get)
} else {
logger.info(s"About to attempt to send a message to delete Object Matrix data for object ${onlineOutputMessage.nearlineId.get}")
rabbitMqMatrix ! MatrixEvent(onlineOutputMessage)
}
})
case Left(something) =>
logger.info(s"No Object Matrix data was found to process.")
}
MatrixDeleteJobDAO.getOrCreate(projectId, "Finished")
}
}
def makeDeletionRecord() = Future {
implicit val db = dbConfig.db
var user = ""
val currentDate = new Date()
val timestampOfNow = new Timestamp(currentDate.getTime)
var created = timestampOfNow
var workingGroupName = ""
ProjectEntry.entryForId(projectId).map({
case Success(projectEntry: ProjectEntry) =>
user = projectEntry.user
created = projectEntry.created
projectEntry.getWorkingGroup.map({
case Some(workingGroup: PlutoWorkingGroup) =>
workingGroupName = workingGroup.name
DeletionRecordDAO.getOrCreate(projectId, user, timestampOfNow, created, workingGroupName)
case None =>
logger.error(s"Could not get working group name for project ${projectId}")
DeletionRecordDAO.getOrCreate(projectId, user, timestampOfNow, created, "Unknown")
})
case Failure(error) =>
logger.error(s"Could not look up project entry for ${projectId}: ", error)
Left(error.toString)
})
}
val f = for {
f1 <- makeDeletionRecord()
f2 <- deletePTRJob()
f3 <- deleteFileJob()
f4 <- deleteBackupsJob()
f5 <- deleteDeliverables()
f6 <- deleteS3()
f7 <- deleteMatrix()
f8 <- deleteSAN()
} yield List(f1, f2, f3, f4, f5, f6, f7, f8)
if (pluto) {
Thread.sleep(800)
implicit val db = dbConfig.db
ProjectMetadata.deleteAllMetadataFor(projectId).map({
case Success(rows) =>
logger.info(s"Attempt at removing project metadata worked.")
case Failure(err) =>
logger.error(s"Could not delete metadata", err)
})
ProjectEntry.entryForId(projectId).map({
case Success(projectEntry: ProjectEntry) =>
projectEntry.removeFromDatabase.map({
case Success(_) =>
logger.info(s"Attempt at removing project record worked.")
case Failure(error) =>
logger.error(s"Attempt at removing project record failed with error: ${error}")
})
case Failure(error) =>
logger.error(s"Could not look up project entry for ${projectId}: ", error)
Left(error.toString)
})
}
}