media-api/app/lib/elasticsearch/ElasticSearch.scala (323 lines of code) (raw):

package lib.elasticsearch import org.apache.pekko.actor.Scheduler import com.gu.mediaservice.lib.ImageFields import com.gu.mediaservice.lib.elasticsearch.filters import com.gu.mediaservice.lib.auth.Authentication.Principal import com.gu.mediaservice.lib.elasticsearch.{CompletionPreview, ElasticSearchClient, ElasticSearchConfig, MigrationStatusProvider, Running} import com.gu.mediaservice.lib.logging.{GridLogging, MarkerMap} import com.gu.mediaservice.lib.metrics.FutureSyntax import com.gu.mediaservice.model.{Agencies, Agency, AwaitingReviewForSyndication, Image} import com.sksamuel.elastic4s.ElasticDsl import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.requests.get.{GetRequest, GetResponse} import com.sksamuel.elastic4s.requests.script.{Script, ScriptField} import com.sksamuel.elastic4s.requests.searches._ import com.sksamuel.elastic4s.requests.searches.aggs.Aggregation import com.sksamuel.elastic4s.requests.searches.aggs.responses.Aggregations import com.sksamuel.elastic4s.requests.searches.aggs.responses.bucket.{DateHistogram, Terms} import com.sksamuel.elastic4s.requests.searches.queries.Query import lib.querysyntax.{HierarchyField, Match, Parser, Phrase} import lib.{MediaApiConfig, MediaApiMetrics, SupplierUsageSummary} import play.api.libs.json.{JsError, JsObject, JsSuccess, Json} import play.api.mvc.AnyContent import play.api.mvc.Security.AuthenticatedRequest import play.mvc.Http.Status import scalaz.NonEmptyList import scalaz.syntax.std.list._ import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} class ElasticSearch( val config: MediaApiConfig, mediaApiMetrics: MediaApiMetrics, elasticConfig: ElasticSearchConfig, overQuotaAgencies: () => List[Agency], val scheduler: Scheduler ) extends ElasticSearchClient with ImageFields with MatchFields with FutureSyntax with GridLogging with MigrationStatusProvider { private val orgOwnedAggName = "org-owned" lazy val imagesCurrentAlias = elasticConfig.aliases.current lazy val imagesMigrationAlias = elasticConfig.aliases.migration lazy val url = elasticConfig.url lazy val shards = elasticConfig.shards lazy val replicas = elasticConfig.replicas private val SearchQueryTimeout = FiniteDuration(10, TimeUnit.SECONDS) // there is 15 seconds timeout set on cluster level as well /** * int terms of search query timeout in GRID, * there is a additional config `allow_partial_search_results` * which is set to true by default, * which means for example if i ask ES to give me photos that have field foo=bar without timeout it can give me 6500 results * if i ask the same query with 1ms timeout it may give me for example 4000 results instead **/ val searchFilters = new SearchFilters(config) val syndicationFilter = new SyndicationFilter(config) val queryBuilder = new QueryBuilder(matchFields, overQuotaAgencies, config) def getImageById(id: String)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[Option[Image]] = getImageWithSourceById(id).map(_.map(_.instance)) private def migrationAwareGetter[T]( id: String, logMessagePart: String, requestFromIndexName: String => GetRequest, resultTransformer: GetResponse => Option[T] )( implicit ex: ExecutionContext ): Future[Option[T]] = { implicit val logMarker: MarkerMap = MarkerMap("id" -> id) def getFromCurrentIndex = executeAndLog( request = requestFromIndexName(imagesCurrentAlias), message = s"get $logMessagePart by id $id from index with alias $imagesCurrentAlias" ).map { r => r.status match { case Status.OK => resultTransformer(r.result) case _ => None } } migrationStatus match { case running: Running => executeAndLog( request = requestFromIndexName(running.migrationIndexName), message = s"get $logMessagePart by id $id from migration index ${running.migrationIndexName}" ).flatMap { r => r.status match { case Status.OK => Future.successful(resultTransformer(r.result)) case _ => getFromCurrentIndex } } case _ => getFromCurrentIndex } } def getImageWithSourceById(id: String)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[Option[SourceWrapper[Image]]] = { migrationAwareGetter( id, logMessagePart = "image", requestFromIndexName = indexName => get(indexName, id), resultTransformer = (result: GetResponse) => mapImageFrom(result.sourceAsString, id, result.index) ) } def getImageUploaderById(id: String)(implicit ex: ExecutionContext): Future[Option[String]] = { migrationAwareGetter( id, logMessagePart = "image uploader", requestFromIndexName = indexName => get(indexName, id).fetchSourceInclude("uploadedBy"), resultTransformer = _.sourceFieldOpt("uploadedBy").collect { case s: String => s } ) } def search(params: SearchParams)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal], logMarker:MarkerMap = MarkerMap()): Future[SearchResults] = { val isPotentiallyGraphicFieldName = "isPotentiallyGraphic" def resolveHit(hit: SearchHit) = mapImageFrom( hit.sourceAsString, hit.id, hit.index, fields = hit.fields match { case null => JsObject.empty case _ => Json.obj( isPotentiallyGraphicFieldName -> hit.fields.get(isPotentiallyGraphicFieldName).map(_.asInstanceOf[List[Boolean]].headOption) ) } ) val query: Query = queryBuilder.makeQuery(params.structuredQuery) val uploadTimeFilter = filters.date("uploadTime", params.since, params.until) val lastModTimeFilter = filters.date("lastModified", params.modifiedSince, params.modifiedUntil) val takenTimeFilter = filters.date("metadata.dateTaken", params.takenSince, params.takenUntil) // we only inject filters if there are actual date parameters val dateFilterList = List(uploadTimeFilter, lastModTimeFilter, takenTimeFilter).flatten.toNel val dateFilter = dateFilterList.map(dateFilters => filters.and(dateFilters.list.toList: _*)) val idsFilter = params.ids.map(filters.ids) val labelFilter = params.labels.toNel.map(filters.terms("labels", _)) val metadataFilter = params.hasMetadata.map(metadataField).toNel.map(filters.exists) val archivedFilter = params.archived.map(filters.existsOrMissing(editsField("archived"), _)) val hasExports = params.hasExports.map(filters.existsOrMissing("exports", _)) val hasIdentifier = params.hasIdentifier.map(idName => filters.exists(NonEmptyList(identifierField(idName)))) val missingIdentifier = params.missingIdentifier.map(idName => filters.missing(NonEmptyList(identifierField(idName)))) val uploadedByFilter = params.uploadedBy.map(uploadedBy => filters.terms("uploadedBy", NonEmptyList(uploadedBy))) val simpleCostFilter = params.free.flatMap(free => if (free) searchFilters.freeFilter else searchFilters.nonFreeFilter) val costFilter = params.payType match { case Some(PayType.Free) => searchFilters.freeFilter case Some(PayType.MaybeFree) => searchFilters.maybeFreeFilter case Some(PayType.Pay) => searchFilters.nonFreeFilter case _ => None } val printUsageFilter = params.printUsageFilters.map(searchFilters.printUsageFilters) val hasRightsCategory = params.hasRightsCategory.filter(_ == true).map(_ => searchFilters.hasRightsCategoryFilter) val validityFilter = params.valid.map(valid => if (valid) searchFilters.validFilter else searchFilters.invalidFilter) val persistFilter = params.persisted map { case true => searchFilters.persistedFilter case false => searchFilters.nonPersistedFilter } val usageFilter: Iterable[Query] = params.usageStatus.toNel.map(status => filters.terms("usagesStatus", status.map(_.toString))).toOption ++ params.usagePlatform.toNel.map(filters.terms("usagesPlatform", _)).toOption val syndicationStatusFilter = params.syndicationStatus.map(status => syndicationFilter.statusFilter(status)) // Port of special case code in elastic1 sorts. Using the dateAddedToCollection sort implies an additional filter for reasons unknown val dateAddedToCollectionFilter = { params.orderBy match { case Some("dateAddedToCollection") => { val pathHierarchyOpt = params.structuredQuery.flatMap { case Match(HierarchyField, Phrase(value)) => Some(value) case _ => None }.headOption pathHierarchyOpt.map { pathHierarchy => termQuery("collections.pathHierarchy", pathHierarchy) } } case _ => None } } val filterOpt = ( metadataFilter.toOption.toList ++ persistFilter ++ labelFilter.toOption ++ archivedFilter ++ uploadedByFilter ++ idsFilter ++ validityFilter ++ simpleCostFilter ++ costFilter ++ hasExports ++ hasIdentifier ++ missingIdentifier ++ dateFilter.toOption ++ usageFilter ++ hasRightsCategory ++ searchFilters.tierFilter(params.tier) ++ syndicationStatusFilter ++ dateAddedToCollectionFilter ++ printUsageFilter ).toNel.map(filter => filter.list.toList.reduceLeft(filters.and(_, _))).toOption val withFilter = filterOpt.map { f => boolQuery() must (query) filter f }.getOrElse(query) val sort = params.orderBy match { case Some("dateAddedToCollection") => sorts.dateAddedToCollectionDescending case _ => sorts.createSort(params.orderBy) } val runtimeMappings = if (params.syndicationStatus.contains(AwaitingReviewForSyndication) && config.useRuntimeFieldsToFixSyndicationReviewQueueQuery) { Seq(syndicationFilter.syndicationReviewQueueFixMapping) } else { Seq.empty } // We need to set trackHits to ensure that the total number of hits we return to users is accurate. // See https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking-changes-7.0.html#hits-total-now-object-search-response val trackTotalHits = params.countAll.getOrElse(true) val graphicImagesScriptFields = if (params.shouldFlagGraphicImages) { Seq(ScriptField( field = isPotentiallyGraphicFieldName, // the rest of the logic is in the client (in image.js) script = Script( //language=groovy -- it's actually painless, but that's pretty similar to groovy and this provides syntax highlighting script = "params['_source']?.fileMetadata?.xmp !=null && params['_source']?.fileMetadata?.xmp['pur:adultContentWarning'] != null", lang = Some("painless") ) )) } else { Seq.empty } val searchRequest = prepareSearch(withFilter) .trackTotalHits(trackTotalHits) .runtimeMappings(runtimeMappings) .storedFields("_source") // this needs to be explicit when using script fields .scriptfields(graphicImagesScriptFields) .aggregations(if (config.shouldDisplayOrgOwnedCountAndFilterCheckbox) List(filterAgg( orgOwnedAggName, queryBuilder.makeQuery(Parser.run(s"is:${config.staffPhotographerOrganisation}-owned")) )) else Nil) .from(params.offset) .size(params.length) .sortBy(sort) executeAndLog(searchRequest, "image search"). toMetric(Some(mediaApiMetrics.searchQueries), List(mediaApiMetrics.searchTypeDimension("results")))(_.result.took).map { r => logSearchQueryIfTimedOut(searchRequest, r.result) val imageHits = r.result.hits.hits.map(resolveHit).toSeq.flatten.map(i => (i.instance.id, i)) // setting trackTotalHits to false means we don't get any hit count at all. // Requester has explicitly opted into not caring about the total hits, so give them what they want (nothing). SearchResults( hits = imageHits, total = if (trackTotalHits) r.result.totalHits else 0, maybeOrgOwnedCount = if (config.shouldDisplayOrgOwnedCountAndFilterCheckbox) Some(r.result.aggregations.filter(orgOwnedAggName).docCount) else None ) } } def usageForSupplier(id: String, numDays: Int)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[SupplierUsageSummary] = { implicit val logMarker: MarkerMap = MarkerMap() val supplier = Agencies.get(id) val supplierName = supplier.supplier val bePublished = termQuery("usages.status", "published") val beInLastPeriod = rangeQuery("usages.dateAdded") .gte(s"now-${numDays}d/d") .lt("now/d") val haveUsageInLastPeriod = boolQuery().must(bePublished, beInLastPeriod) val beSupplier = termQuery("usageRights.supplier", supplierName) val haveNestedUsage = nestedQuery("usages", haveUsageInLastPeriod) val query = boolQuery().must(matchAllQuery()).filter(boolQuery().must(beSupplier, haveNestedUsage)) val search = prepareSearch(query) size 0 executeAndLog(search, s"$id usage search").map { r => import r.result logSearchQueryIfTimedOut(search, result) SupplierUsageSummary(supplier, result.hits.total.value) } } def dateHistogramAggregate(params: AggregateSearchParams)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[AggregateSearchResults] = { def fromDateHistogramAggregation(name: String, aggregations: Aggregations): Seq[BucketResult] = aggregations.result[DateHistogram](name). buckets.map(b => BucketResult(b.date, b.docCount)) val aggregation = dateHistogramAgg(name = params.field, field = params.field). calendarInterval(DateHistogramInterval.Month). minDocCount(0) aggregateSearch(params.field, params, aggregation, fromDateHistogramAggregation) } def metadataSearch(params: AggregateSearchParams)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[AggregateSearchResults] = { aggregateSearch("metadata", params, termsAgg(name = "metadata", field = metadataField(params.field)), fromTermAggregation) } def editsSearch(params: AggregateSearchParams)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[AggregateSearchResults] = { logger.info("Edit aggregation requested with params.field: " + params.field) val field = "labels" // TODO was - params.field aggregateSearch("edits", params, termsAgg(name = "edits", field = editsField(field)), fromTermAggregation) } private def fromTermAggregation(name: String, aggregations: Aggregations): Seq[BucketResult] = aggregations.result[Terms](name). buckets.map(b => BucketResult(b.key, b.docCount)) private def aggregateSearch(name: String, params: AggregateSearchParams, aggregation: Aggregation, extract: (String, Aggregations) => Seq[BucketResult])(implicit ex: ExecutionContext): Future[AggregateSearchResults] = { implicit val logMarker: MarkerMap = MarkerMap() logger.info("aggregate search: " + name + " / " + params + " / " + aggregation) val query = queryBuilder.makeQuery(params.structuredQuery) val search = prepareSearch(query) aggregations aggregation size 0 executeAndLog(search, s"$name aggregate search") .toMetric(Some(mediaApiMetrics.searchQueries), List(mediaApiMetrics.searchTypeDimension("aggregate")))(_.result.took).map { r => logSearchQueryIfTimedOut(search, r.result) searchResultToAggregateResponse(r.result, name, extract) } } private def searchResultToAggregateResponse(response: SearchResponse, aggregateName: String, extract: (String, Aggregations) => Seq[BucketResult]): AggregateSearchResults = { val results = extract(aggregateName, response.aggregations) AggregateSearchResults(results, results.size) } def completionSuggestion(name: String, q: String, size: Int)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[CompletionSuggestionResults] = { implicit val logMarker: MarkerMap = MarkerMap() val completionSuggestion = ElasticDsl.completionSuggestion(name, name).text(q).skipDuplicates(true) val search = ElasticDsl.search(imagesCurrentAlias) suggestions completionSuggestion executeAndLog(search, "completion suggestion query"). toMetric(Some(mediaApiMetrics.searchQueries), List(mediaApiMetrics.searchTypeDimension("suggestion-completion")))(_.result.took).map { r => logSearchQueryIfTimedOut(search, r.result) val x = r.result.suggestions.get(name).map { suggestions => suggestions.flatMap { s => s.toCompletion.options.map { o => CompletionSuggestionResult(o.text, o.score.toFloat) } } }.getOrElse(Seq.empty) CompletionSuggestionResults(x.toList) } } def withSearchQueryTimeout(sr: SearchRequest): SearchRequest = sr timeout SearchQueryTimeout private def prepareSearch(query: Query): SearchRequest = { val indexes = migrationStatus match { case completionPreview: CompletionPreview => List(completionPreview.migrationIndexName) case running: Running => List(imagesCurrentAlias, running.migrationIndexName) case _ => List(imagesCurrentAlias) } val migrationAwareQuery = migrationStatus match { case running: Running => filters.and(query, filters.mustNot(filters.term("esInfo.migration.migratedTo", running.migrationIndexName))) case _ => query } val searchRequest = ElasticDsl.search(indexes) query migrationAwareQuery withSearchQueryTimeout(searchRequest) } private def mapImageFrom(sourceAsString: String, id: String, fromIndex: String, fields: JsObject = JsObject.empty) = { val source = Json.parse(sourceAsString) source.validate[Image] match { case i: JsSuccess[Image] => Some(SourceWrapper(source, i.value, fromIndex, fields)) case e: JsError => logger.error("Failed to parse image from source string " + id + ": " + e.toString) None } } private def logSearchQueryIfTimedOut(req: SearchRequest, res: SearchResponse) = if (res.isTimedOut) logger.info(s"SearchQuery was TimedOut after $SearchQueryTimeout \nquery: ${req.show}") }