thrall/app/controllers/ReaperController.scala (184 lines of code) (raw):

package controllers import org.apache.pekko.actor.Scheduler import com.gu.mediaservice.lib.{DateTimeUtils, ImageIngestOperations} import com.gu.mediaservice.lib.auth.Permissions.DeleteImage import com.gu.mediaservice.lib.auth.{Authentication, Authorisation, BaseControllerWithLoginRedirects} import com.gu.mediaservice.lib.config.Services import com.gu.mediaservice.lib.elasticsearch.ReapableEligibility import com.gu.mediaservice.lib.logging.{GridLogging, MarkerMap} import com.gu.mediaservice.lib.metadata.SoftDeletedMetadataTable import com.gu.mediaservice.model.{ImageStatusRecord, SoftDeletedMetadata} import lib.{BatchDeletionIds, ThrallConfig, ThrallMetrics, ThrallStore} import lib.elasticsearch.ElasticSearch import org.joda.time.{DateTime, DateTimeZone} import play.api.libs.json.{JsValue, Json} import play.api.mvc.{Action, AnyContent, ControllerComponents} import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters._ import scala.language.postfixOps import scala.util.control.NonFatal import scala.util.{Failure, Success} class ReaperController( es: ElasticSearch, store: ThrallStore, authorisation: Authorisation, config: ThrallConfig, scheduler: Scheduler, maybeCustomReapableEligibility: Option[ReapableEligibility], softDeletedMetadataTable: SoftDeletedMetadataTable, metrics: ThrallMetrics, override val auth: Authentication, override val services: Services, override val controllerComponents: ControllerComponents, )(implicit val ec: ExecutionContext) extends BaseControllerWithLoginRedirects with GridLogging { private val CONTROL_FILE_NAME = "PAUSED" private val INTERVAL = config.reaperInterval //default 15 minutes, based on max of 1000 per reap, this interval will max out at 96,000 images per day implicit val logMarker: MarkerMap = MarkerMap() private val isReapable = maybeCustomReapableEligibility getOrElse { new ReapableEligibility { override val maybePersistOnlyTheseCollections: Option[Set[String]] = config.maybePersistOnlyTheseCollections override val persistenceIdentifier: String = config.persistenceIdentifier } } (config.maybeReaperBucket, config.maybeReaperCountPerRun) match { case (Some(reaperBucket), Some(countOfImagesToReap)) => scheduler.scheduleAtFixedRate( initialDelay = DateTimeUtils.timeUntilNextInterval(INTERVAL), // so we always start on multiples of the interval past the hour interval = INTERVAL, ){ () => try { if (store.client.doesObjectExist(reaperBucket, CONTROL_FILE_NAME)) { logger.info("Reaper is paused") es.countTotalSoftReapable(isReapable).map(metrics.softReapable.increment(Nil, _)) es.countTotalHardReapable(isReapable, config.hardReapImagesAge).map(metrics.hardReapable.increment(Nil, _)) } else { val deletedBy = "reaper" Future.sequence(Seq( doBatchSoftReap(countOfImagesToReap, deletedBy), doBatchHardReap(countOfImagesToReap, deletedBy) )).onComplete { case Success(_) => logger.info("Reap completed") case Failure(e) => logger.error("Reap failed", e) } } } catch { case NonFatal(e) => logger.error("Reap failed", e) } } case _ => logger.info("scheduled reaper will not run since 's3.reaper.bucket' and 'reaper.countPerRun' need to be configured in thrall.conf") } private def batchDeleteWrapper(count: Int)(func: (Int, String) => Future[JsValue]) = auth.async { request => if (!authorisation.hasPermissionTo(DeleteImage)(request.user)) { Future.successful(Forbidden) } else if (count > 1000) { Future.successful(BadRequest("Too many IDs. Maximum 1000.")) } else { func( count, request.user.accessor.identity ).map(Ok(_)) } } private def s3DirNameFromDate(date: DateTime) = date.toString("YYYY-MM-dd") private def persistedBatchDeleteOperation(deleteType: String)(doBatchDelete: => Future[JsValue]) = config.maybeReaperBucket match { case None => Future.failed(new Exception("Reaper bucket not configured")) case Some(reaperBucket) => doBatchDelete.map { json => val now = DateTime.now(DateTimeZone.UTC) val key = s"$deleteType/${s3DirNameFromDate(now)}/$deleteType-${now.toString()}.json" store.client.putObject(reaperBucket, key, json.toString()) json } } def doBatchSoftReap(count: Int): Action[AnyContent] = batchDeleteWrapper(count)(doBatchSoftReap) def doBatchSoftReap(count: Int, deletedBy: String): Future[JsValue] = persistedBatchDeleteOperation("soft"){ es.countTotalSoftReapable(isReapable).map(metrics.softReapable.increment(Nil, _)) logger.info(s"Soft deleting next $count images...") val deleteTime = DateTime.now(DateTimeZone.UTC) (for { BatchDeletionIds(esIds, esIdsActuallySoftDeleted) <- es.softDeleteNextBatchOfImages(isReapable, count, SoftDeletedMetadata(deleteTime, deletedBy)) _ <- softDeletedMetadataTable.setStatuses(esIdsActuallySoftDeleted.map( ImageStatusRecord( _, deletedBy, deleteTime = deleteTime.toString, isDeleted = true ) )) } yield { metrics.softReaped.increment(n = esIdsActuallySoftDeleted.size) esIds.map { id => val wasSoftDeletedInES = esIdsActuallySoftDeleted.contains(id) val detail = Map( "ES" -> wasSoftDeletedInES, ) logger.info(s"Soft deleted image $id : $detail") id -> detail }.toMap }).map(Json.toJson(_)) } def doBatchHardReap(count: Int): Action[AnyContent] = batchDeleteWrapper(count)(doBatchHardReap) def doBatchHardReap(count: Int, deletedBy: String): Future[JsValue] = persistedBatchDeleteOperation("hard"){ es.countTotalHardReapable(isReapable, config.hardReapImagesAge).map(metrics.hardReapable.increment(Nil, _)) logger.info(s"Hard deleting next $count images...") (for { BatchDeletionIds(esIds, esIdsActuallyDeleted) <- es.hardDeleteNextBatchOfImages(isReapable, count, config.hardReapImagesAge) mainImagesS3Deletions <- store.deleteOriginals(esIdsActuallyDeleted) thumbsS3Deletions <- store.deleteThumbnails(esIdsActuallyDeleted) pngsS3Deletions <- store.deletePNGs(esIdsActuallyDeleted) _ <- softDeletedMetadataTable.clearStatuses(esIdsActuallyDeleted) } yield { metrics.hardReaped.increment(n = esIdsActuallyDeleted.size) esIds.map { id => val wasHardDeletedFromES = esIdsActuallyDeleted.contains(id) val detail = Map( "ES" -> Some(wasHardDeletedFromES), "mainImage" -> mainImagesS3Deletions.get(ImageIngestOperations.fileKeyFromId(id)), "thumb" -> thumbsS3Deletions.get(ImageIngestOperations.fileKeyFromId(id)), "optimisedPng" -> pngsS3Deletions.get(ImageIngestOperations.optimisedPngKeyFromId(id)), ) logger.info(s"Hard deleted image $id : $detail") id -> detail }.toMap }).map(Json.toJson(_)) } def index = withLoginRedirect { val now = DateTime.now(DateTimeZone.UTC) (config.maybeReaperBucket, config.maybeReaperCountPerRun) match { case (None, _) => NotImplemented("'s3.reaper.bucket' not configured in thrall.conf") case (_, None) => NotImplemented("'reaper.countPerRun' not configured in thrall.conf") case (Some(reaperBucket), Some(countOfImagesToReap)) => val isPaused = store.client.doesObjectExist(reaperBucket, CONTROL_FILE_NAME) val recentRecords = List(now, now.minusDays(1), now.minusDays(2)).flatMap { day => val s3DirName = s3DirNameFromDate(day) store.client.listObjects(reaperBucket, s"soft/$s3DirName/").getObjectSummaries.asScala.toList ++ store.client.listObjects(reaperBucket, s"hard/$s3DirName/").getObjectSummaries.asScala.toList } val recentRecordKeys = recentRecords .filter(_.getLastModified after now.minusHours(48).toDate) .sortBy(_.getLastModified) .reverse .map(_.getKey) Ok(views.html.reaper(isPaused, INTERVAL.toString(), countOfImagesToReap, recentRecordKeys)) }} def reaperRecord(key: String) = auth { config.maybeReaperBucket match { case None => NotImplemented("Reaper bucket not configured") case Some(reaperBucket) => Ok( store.client.getObjectAsString(reaperBucket, key) ).as(JSON) }} def pauseReaper = auth { config.maybeReaperBucket match { case None => NotImplemented("Reaper bucket not configured") case Some(reaperBucket) => store.client.putObject(reaperBucket, CONTROL_FILE_NAME, "") Redirect(routes.ReaperController.index) }} def resumeReaper = auth { config.maybeReaperBucket match { case None => NotImplemented("Reaper bucket not configured") case Some(reaperBucket) => store.client.deleteObject(reaperBucket, CONTROL_FILE_NAME) Redirect(routes.ReaperController.index) }} }