app/controllers/DeletedItemsController.scala (162 lines of code) (raw):
package controllers
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl._
import akka.util.ByteString
import auth.{BearerTokenAuth, Security}
import com.sksamuel.elastic4s.searches.queries.Query
import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, ArchiveEntryHitReader, Indexer, StorageClassEncoder, ZonedDateTimeEncoder}
import com.theguardian.multimedia.archivehunter.common.clientManagers.{ESClientManager, S3ClientManager}
import com.theguardian.multimedia.archivehunter.common.cmn_models.{ItemNotFound, ScanTarget, ScanTargetDAO}
import helpers.{ItemFolderHelper, WithScanTarget}
import play.api.{Configuration, Logger}
import play.api.libs.circe.Circe
import play.api.mvc.{AbstractController, ControllerComponents, Request, ResponseHeader, Result}
import responses.{BulkDeleteConfirmationResponse, DeletionSummaryResponse, GenericErrorResponse, ObjectListResponse, PathInfoResponse}
import javax.inject.{Inject, Singleton}
import scala.concurrent.Future
import io.circe.syntax._
import io.circe.generic.auto._
import org.slf4j.LoggerFactory
import play.api.cache.SyncCacheApi
import play.api.http.HttpEntity
import requests.SearchRequest
import scala.annotation.switch
import scala.concurrent.ExecutionContext.Implicits.global
@Singleton
class DeletedItemsController @Inject() (override val config:Configuration,
scanTargetDAO:ScanTargetDAO,
override val controllerComponents: ControllerComponents,
esClientMgr:ESClientManager,
override val bearerTokenAuth: BearerTokenAuth,
override val cache:SyncCacheApi)
(implicit actorSystem:ActorSystem, mat:Materializer)
extends AbstractController(controllerComponents) with Security with WithScanTarget with ArchiveEntryHitReader
with ZonedDateTimeEncoder with StorageClassEncoder with Circe
{
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.streams.ReactiveElastic._
private implicit val esClient = esClientMgr.getClient()
override protected val logger=LoggerFactory.getLogger(getClass)
private val indexName = config.get[String]("externalData.indexName")
private val indexer = new Indexer(indexName)
protected def makeQuery(collectionName:String, prefix:Option[String], searchRequest: SearchRequest) = {
val queries = Seq(
Some(matchQuery("bucket.keyword", collectionName)),
prefix.map(pfx => termQuery("path", pfx)),
Some(matchQuery("beenDeleted", true))
).collect({ case Some(x) => x }) ++ searchRequest.toSearchParams
boolQuery().must(queries)
}
protected def makeSearchRequest(q:Query) = {
val aggs = Seq(
sumAgg("totalSize", "size"),
)
search(indexName) query q aggregations aggs
}
/**
* returns a summary of how many items have been deleted for the given collection.
* @param collectionName
* @param prefix
* @return
*/
def deletedItemsSummary(collectionName:String, prefix:Option[String]) = IsAuthenticatedAsync(circe.json(2048)) { uid=> request=>
request.body.as[SearchRequest].fold(
err=>
Future(BadRequest(GenericErrorResponse("bad_request", err.toString).asJson)),
searchRequest=> {
withScanTargetAsync(collectionName, scanTargetDAO) { target =>
esClient.execute(makeSearchRequest(makeQuery(collectionName, prefix, searchRequest))).map(response => {
(response.status: @switch) match {
case 200 =>
logger.info(s"Got ${response.result.aggregations}")
Ok(DeletionSummaryResponse("ok",
response.result.hits.total,
response.result.aggregations.sum("totalSize").value.toLong
).asJson)
case _ =>
InternalServerError(GenericErrorResponse("search_error", response.error.reason).asJson)
}
}).recover({
case err: Throwable =>
logger.error(s"Could not scan $collectionName (with prefix $prefix) for deleted items: ${err.getMessage}", err)
InternalServerError(GenericErrorResponse("db_error", err.getMessage).asJson)
})
}
})
}
private def withQueryFromSearchdoc[T<:io.circe.Json](collectionName:String, prefix:Option[String], request:Request[T])
(cb:com.sksamuel.elastic4s.searches.queries.Query=>Future[Result]) =
request.body.as[SearchRequest].fold(
err=>
Future(BadRequest(GenericErrorResponse("bad_request", err.toString).asJson)),
searchRequest=>
withScanTargetAsync(collectionName, scanTargetDAO) { _ =>
val query = makeQuery(collectionName, prefix, searchRequest)
cb(query)
}
)
/**
* sends an NDJSON stream of items marked as deleted from the given collection and prefix
* @param collectionName collection name to scan
* @param prefix optional path prefix
* @return
*/
def deletedItemsListStreaming(collectionName:String, prefix:Option[String], limit:Option[Long]) = IsAuthenticatedAsync(circe.json(2048)) { uid=> request=>
withQueryFromSearchdoc(collectionName, prefix, request) { query=>
val source = Source.fromPublisher(esClient.publisher(makeSearchRequest(query).scroll("5m")))
val appliedLimit = limit.getOrElse(1000L)
val contentStream = source
.limit(appliedLimit)
.map(_.to[ArchiveEntry])
.map(_.asJson)
.map(_.noSpaces + "\n")
.map(ByteString.apply)
Future(Result(
header = ResponseHeader(status=200),
body = HttpEntity.Streamed(contentStream,contentType=Some("application/x-ndjson"), contentLength=None)
))
}
}
def bulkDeleteBySearch(collectionName:String, prefix:Option[String]) = IsAdminAsync(circe.json(2048)) { uid=> request=>
withQueryFromSearchdoc(collectionName, prefix, request) { query=>
esClient
.execute(deleteByQuery(indexName, "entry", query))
.map(response=>{
if(response.status>=200 && response.status<=299) {
Ok(BulkDeleteConfirmationResponse("ok",response.result.deleted, response.result.took).asJson)
} else {
logger.error(s"Could not perform bulk deletion of tombstones for $collectionName at $prefix: ${response.status} ${response.error}")
InternalServerError(GenericErrorResponse("db_error", response.error.toString).asJson)
}
})
.recover({
case err:Throwable=>
logger.error(s"Bulk delete thread for $collectionName at $prefix crashed: ${err.getMessage}",err)
InternalServerError(GenericErrorResponse("db_error", err.toString).asJson)
})
}
}
private def performItemDelete(collectionName:String, itemId:String) = esClient
.execute(deleteById(indexName, "entry", itemId))
.map(response=>{
if(response.status==200 || response.status==201) {
Ok(GenericErrorResponse("ok","item deleted").asJson)
} else {
logger.error(s"Could not delete item $itemId from $collectionName: ${response.status} ${response.error.reason}")
InternalServerError(GenericErrorResponse("db_error","index returned error").asJson)
}
})
.recover({
case err:Throwable=>
logger.error(s"Item delete request failed: ${err.getMessage}", err);
InternalServerError(GenericErrorResponse("internal_error", err.getMessage).asJson)
})
def removeTombstoneById(collectionName:String, itemId:String) = IsAdminAsync { uid=> request=>
indexer.getByIdFull(itemId).flatMap({
case Left(ItemNotFound(itemId))=>
Future(NotFound(GenericErrorResponse("not_found", itemId).asJson))
case Left(other)=>
logger.error(s"Could not look up item $itemId from $collectionName: ${other.errorDesc}")
Future(InternalServerError(GenericErrorResponse("error", other.errorDesc).asJson))
case Right(entry)=>
if(entry.bucket!=collectionName) {
logger.warn(s"Invalid tombstone removal request: $itemId exists but not within bucket $collectionName, returning 404")
Future(NotFound(GenericErrorResponse("not_found", itemId).asJson))
} else if(!entry.beenDeleted) {
logger.warn(s"Invalid tombstone removal request: $itemId is not a tombstone")
Future(Conflict(GenericErrorResponse("conflict","this item is not a tombstone").asJson))
} else {
performItemDelete(collectionName, itemId)
}
})
}
}