app/helpers/LightboxHelper.scala (152 lines of code) (raw):

package helpers import java.time.ZonedDateTime import akka.actor.{ActorRefFactory, ActorSystem} import akka.stream.{ClosedShape, Materializer} import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source} import com.google.inject.Injector import com.sksamuel.elastic4s.http.{ElasticClient, ElasticError, HttpClient, RequestFailure} import com.sksamuel.elastic4s.searches.SearchRequest import com.sksamuel.elastic4s.searches.queries.Query import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer, LightboxIndex, StorageClass} import com.theguardian.multimedia.archivehunter.common.cmn_models.{LightboxBulkEntry, LightboxEntry, LightboxEntryDAO, RestoreStatus} import helpers.LightboxStreamComponents._ import models.UserProfile import play.api.Logger import responses.{GenericErrorResponse, ObjectListResponse, QuotaExceededResponse} import scala.annotation.switch import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} object LightboxHelper { import com.sksamuel.elastic4s.streams.ReactiveElastic._ import com.sksamuel.elastic4s.http.ElasticDsl._ protected def logger = Logger(getClass) /** * create a lightbox entry record in the dynamo table * @param userProfile user profile for the user that is storing the item * @param indexEntry archiveEntry object for the item being stored * @param lightboxEntryDAO implicitly provided Data Access object * @param ec implicitly provided execution context * @return a Future, containing a Try which either has the created LightboxEntry or a Failure */ def saveLightboxEntry(userProfile:UserProfile, indexEntry:ArchiveEntry, bulkId:Option[String])(implicit lightboxEntryDAO: LightboxEntryDAO, ec:ExecutionContext) = { val expectedRestoreStatus = indexEntry.storageClass match { case StorageClass.GLACIER => RestoreStatus.RS_PENDING case _ => RestoreStatus.RS_UNNEEDED } val lbEntry = LightboxEntry(userProfile.userEmail, indexEntry.id, ZonedDateTime.now(), expectedRestoreStatus, None, None, None, None, bulkId) lightboxEntryDAO.put(lbEntry).map(Success.apply).recover({ case err:Throwable=> logger.error(s"Could not save lightbox entry for ${userProfile.userEmail}: ${err.getMessage}", err) Failure(err) }) } /** * update the index entry for the given item to show that it has been lightboxed. * @param userProfile profile for the user that is doing the lightboxing. * @param userAvatarUrl URL for the user's avatar. this is added into the index to improve performance in the UI by preventing the need for extra lookups * @param indexEntry ArchiveEntry representing the item being lightboxed * @param bulkId If the item is being added as part of a bulk set, this should contain the ID of the bulk set, otherwise it's None * @param esClient implicitly provided ElasticSearch HttpClient * @param indexer implicitly provided Indexer instance * @param ec implicitly provided Execution Context * @return a Future, containing Success with the updated index item's ID or a Failure if something broke. */ def updateIndexLightboxed(userProfile:UserProfile, userAvatarUrl:Option[String], indexEntry:ArchiveEntry, bulkId:Option[String])(implicit esClient:ElasticClient, indexer:Indexer, ec:ExecutionContext) = { val lbIndex = LightboxIndex(userProfile.userEmail,userAvatarUrl, ZonedDateTime.now(), bulkId) logger.debug(s"lbIndex is $lbIndex") val updatedEntry = indexEntry.copy(lightboxEntries = indexEntry.lightboxEntries ++ Seq(lbIndex)) logger.debug(s"updateEntry is $updatedEntry") indexer.indexSingleItem(updatedEntry,Some(updatedEntry.id)) } /** * internal method, return an Akka Streams Source for the given ES query * @param indexName ElasticSearch index to query * @param queryParams Elastic4s QueryDefinition describing the query to perform * @param esClient implicitly provided Elastic4s HttpClient * @param actorRefFactory implicitly provided ActorRefFactory, get this from an ActorSystem * @return a Source that yields SearchHit entries. Connect this to SearchHitToArchiveEntryFlow to convert to domain objects. */ def getElasticSource(indexName:String, queryParams:Query)(implicit esClient:ElasticClient, actorRefFactory:ActorRefFactory) = { val publisher = esClient.publisher(search(indexName) query queryParams scroll "5m") Source.fromPublisher(publisher) } def getElasticSource(searchDefinition: SearchRequest)(implicit esClient:ElasticClient, actorRefFactory:ActorRefFactory) = { val publisher = esClient.publisher(searchDefinition scroll "5m") Source.fromPublisher(publisher) } /** * performs a search expresssed as a SearchRequest, and returns the total size in bytes that it would process. * Used for checking whether a search would breach quota limits * @param indexName index to run the search against * @param rq SearchRequest instance * @param esClient implicitly provided elasticsearch HttpClient * @param actorRefFactory implicitly provided ActorRefFactory * @param materializer implicitly provided Materializer * @return a Future, containing a Long which is the number of bytes */ def getTotalSizeOfSearch(indexName:String, rq:requests.SearchRequest)(implicit esClient:ElasticClient, actorRefFactory:ActorRefFactory, materializer:Materializer) = { logger.info(s"${boolQuery().must(rq.toSearchParams)}") val src = getElasticSource(indexName, boolQuery().must(rq.toSearchParams)) val archiveEntryConverter = new SearchHitToArchiveEntryFlow src.via(archiveEntryConverter).map(entry=>{ logger.info(entry.toString) entry.size }).async.toMat(Sink.reduce[Long]((acc, entry)=>acc+entry))(Keep.right).run() } /** * check the total size of a bulk restore before carrying it out. * @param indexName index name to search * @param userProfile user doing the restore * @param searchReq SearchRequest object with the parameters describing the bulk search * @param esClient implicitly provided ElasticSearch HttpClient * @param actorRefFactory implicitly provided ActorRefFactory, from ActorSystem * @param materializer implicitly provided ActorMaterializer * @param ec implicitly provided ExecutionContext * @return a Future, with either a QuotaExceededResponse indicating that the restore should not be allowed or a Long indicating that it should, and giving the total size in Mb of the restore. */ def testBulkAddSize(indexName:String, userProfile: UserProfile, searchReq:requests.SearchRequest)(implicit esClient:ElasticClient, actorRefFactory:ActorRefFactory, materializer:Materializer, ec:ExecutionContext) = { logger.info(s"Checking size of $searchReq") LightboxHelper.getTotalSizeOfSearch(indexName,searchReq) .map(totalSize=>{ val totalSizeMb = totalSize/1048576L logger.info(s"Total size is $totalSizeMb Mb, userQuota is ${userProfile.perRestoreQuota.getOrElse(0L)}Mb") if(totalSizeMb > userProfile.perRestoreQuota.getOrElse(0L)) { Left(QuotaExceededResponse("quota_exceeded","Your per-request quota has been exceeded",totalSizeMb, userProfile.perRestoreQuota.getOrElse(0))) } else { Right(totalSizeMb) } }) } /** * run the provided SearchRequest and add the result to the given LightboxBulkEntry. * if any elements require restore from Glacier, this will be initated * @param indexName index name to query * @param userProfile profile of the user requesting the restore * @param rq SearchRequest with the search terms to restore * @param bulk initialised LightboxBulkEntry to associate the items with * @param lightboxEntryDAO implicitly provided Data Access Object for lightbox entries * @param system implicitly provided ActorSystem * @param esClient implicitly provided Elastic4s HttpClient * @param indexer implicitly provided Indexer object * @param mat implicitly provided ActorMaterializer * @return a Future, with the LightboxBulkEntry updated to show the number of items it now has associated */ def addToBulkFromSearch(indexName:String, userProfile:UserProfile, userAvatarUrl:Option[String], rq:requests.SearchRequest, bulk:LightboxBulkEntry) (implicit lightboxEntryDAO: LightboxEntryDAO, system:ActorSystem, esClient:ElasticClient, indexer:Indexer, mat:Materializer, ec:ExecutionContext, injector:Injector) = { val archiveEntryConverter = new SearchHitToArchiveEntryFlow val dynamoSaveFlow = new SaveLightboxEntryFlow(bulk.id, userProfile) val maybeRestoreSink = injector.getInstance(classOf[InitiateRestoreSink]) val esSaveSink = new UpdateLightboxIndexInfoSink(bulk.id, userProfile, userAvatarUrl) logger.info(s"Starting add of $rq to bulk lightbox ${bulk.id}" ) val flow = RunnableGraph.fromGraph(GraphDSL.create(dynamoSaveFlow,esSaveSink, maybeRestoreSink)((_,_,_)) { implicit b=> (actualDynamoFlow, actualESSink, actualRestoreSink) => import akka.stream.scaladsl.GraphDSL.Implicits._ val src = b.add(getElasticSource(indexName, boolQuery().must(rq.toSearchParams))) val entryConverter = b.add(archiveEntryConverter) val bcast = b.add(new Broadcast[ArchiveEntry](2, eagerCancel = true)) src ~> entryConverter ~> bcast ~> actualDynamoFlow ~> actualRestoreSink bcast ~> actualESSink ClosedShape }) val resultFutures = flow.run() Future.sequence(Seq(resultFutures._1, resultFutures._2)).map(addedCounts=>{ logger.info(s"addedCounts (dynamo) ${addedCounts.head}, addedCounts (ES) ${addedCounts(1)}") if(addedCounts.head != addedCounts(1)){ logger.warn(s"Mismatch between dynamodb and elastic outputs - ${addedCounts.head} records saved to dynamo but ${addedCounts(1)} records saved to ES") } bulk.copy(availCount = bulk.availCount + addedCounts.head) }) } /** * return an ES query definition for the bulk id - either field not existing for "loose" or matching the id * @param actualBulkId * @return */ protected def lightboxQuery(actualBulkId:String) = if(actualBulkId=="loose"){ boolQuery().withNot(existsQuery("lightboxEntries.memberOfBulk")) } else { matchQuery("lightboxEntries.memberOfBulk", actualBulkId) } def lightboxSearch(indexName:String, bulkId:Option[String], userEmail:String) = { val queryTerms = Seq( Some(matchQuery("lightboxEntries.owner", userEmail)), bulkId.map(actualBulkId=>lightboxQuery(actualBulkId)) ).collect({case Some(term)=>term}) search(indexName) query { nestedQuery(path="lightboxEntries", query = { boolQuery().must(queryTerms) }) } } /** * returns a future that will contain the number of items that aren't associated with a given bulk entry * @param indexName * @param esClient * @return */ def getLooseCountForUser(indexName:String, userEmail:String)(implicit esClient:ElasticClient, ec:ExecutionContext):Future[Either[ElasticError, Int]] = { esClient.execute { lightboxSearch(indexName, Some("loose"), userEmail) limit(0) }.map(response=>{ (response.status: @switch) match { case 200=> Right(response.result.hits.total.toInt) case _=> Left(response.error) } }) } /** * removes the contents of the provided LightboxBulkEntry from the lightbox of the provided user * @param indexName index name to search * @param userProfile user that is being updated * @param bulk LightboxBulkEntry that is being removed * @param lightboxEntryDAO * @param system * @param esClient * @param indexer * @param mat * @param ec * @return a Future, containing an Int of the number of items removed. If the stream errors then the future fails. */ def removeBulkContents(indexName:String, userProfile:UserProfile, bulk:LightboxBulkEntry) (implicit lightboxEntryDAO: LightboxEntryDAO, system:ActorSystem, esClient:ElasticClient, indexer:Indexer, mat:Materializer, ec:ExecutionContext) = { val dynamoSaveSink = new RemoveLightboxEntrySink(userProfile.userEmail) val esSaveSink = new RemoveLightboxIndexInfoSink(userProfile.userEmail) logger.info(s"bulkid is ${bulk.id}") val flow = RunnableGraph.fromGraph(GraphDSL.create(dynamoSaveSink, esSaveSink)((_,_)) { implicit b => (actualDynamoSink, actualESSink) => import akka.stream.scaladsl.GraphDSL.Implicits._ val queryDef = nestedQuery(path="lightboxEntries", query = { matchQuery("lightboxEntries.memberOfBulk", bulk.id) }) val src = b.add(getElasticSource(indexName, queryDef)) val entryConverter = b.add(new SearchHitToArchiveEntryFlow) val bcast = b.add(new Broadcast[ArchiveEntry](2, eagerCancel = false)) src ~> entryConverter ~> bcast ~> actualESSink bcast ~> actualDynamoSink ClosedShape }) val resultFutures = flow.run() Future.sequence(Seq(resultFutures._1, resultFutures._2)).map(addedCounts=> { if (addedCounts.head != addedCounts(1)) { logger.warn(s"Mismatch between dynamodb and elastic outputs - ${addedCounts.head} records saved to dynamo but ${addedCounts(1)} records saved to ES") } logger.info(s"Removed ${addedCounts.head} records from dynamo, ${addedCounts(1)} from ES") addedCounts.head }) } }