media-api/app/lib/elasticsearch/ElasticSearch.scala (323 lines of code) (raw):
package lib.elasticsearch
import org.apache.pekko.actor.Scheduler
import com.gu.mediaservice.lib.ImageFields
import com.gu.mediaservice.lib.elasticsearch.filters
import com.gu.mediaservice.lib.auth.Authentication.Principal
import com.gu.mediaservice.lib.elasticsearch.{CompletionPreview, ElasticSearchClient, ElasticSearchConfig, MigrationStatusProvider, Running}
import com.gu.mediaservice.lib.logging.{GridLogging, MarkerMap}
import com.gu.mediaservice.lib.metrics.FutureSyntax
import com.gu.mediaservice.model.{Agencies, Agency, AwaitingReviewForSyndication, Image}
import com.sksamuel.elastic4s.ElasticDsl
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.requests.get.{GetRequest, GetResponse}
import com.sksamuel.elastic4s.requests.script.{Script, ScriptField}
import com.sksamuel.elastic4s.requests.searches._
import com.sksamuel.elastic4s.requests.searches.aggs.Aggregation
import com.sksamuel.elastic4s.requests.searches.aggs.responses.Aggregations
import com.sksamuel.elastic4s.requests.searches.aggs.responses.bucket.{DateHistogram, Terms}
import com.sksamuel.elastic4s.requests.searches.queries.Query
import lib.querysyntax.{HierarchyField, Match, Parser, Phrase}
import lib.{MediaApiConfig, MediaApiMetrics, SupplierUsageSummary}
import play.api.libs.json.{JsError, JsObject, JsSuccess, Json}
import play.api.mvc.AnyContent
import play.api.mvc.Security.AuthenticatedRequest
import play.mvc.Http.Status
import scalaz.NonEmptyList
import scalaz.syntax.std.list._
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
class ElasticSearch(
val config: MediaApiConfig,
mediaApiMetrics: MediaApiMetrics,
elasticConfig: ElasticSearchConfig,
overQuotaAgencies: () => List[Agency],
val scheduler: Scheduler
) extends ElasticSearchClient with ImageFields with MatchFields with FutureSyntax with GridLogging with MigrationStatusProvider {
private val orgOwnedAggName = "org-owned"
lazy val imagesCurrentAlias = elasticConfig.aliases.current
lazy val imagesMigrationAlias = elasticConfig.aliases.migration
lazy val url = elasticConfig.url
lazy val shards = elasticConfig.shards
lazy val replicas = elasticConfig.replicas
private val SearchQueryTimeout = FiniteDuration(10, TimeUnit.SECONDS)
// there is 15 seconds timeout set on cluster level as well
/**
* int terms of search query timeout in GRID,
* there is a additional config `allow_partial_search_results`
* which is set to true by default,
* which means for example if i ask ES to give me photos that have field foo=bar without timeout it can give me 6500 results
* if i ask the same query with 1ms timeout it may give me for example 4000 results instead
**/
val searchFilters = new SearchFilters(config)
val syndicationFilter = new SyndicationFilter(config)
val queryBuilder = new QueryBuilder(matchFields, overQuotaAgencies, config)
def getImageById(id: String)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[Option[Image]] =
getImageWithSourceById(id).map(_.map(_.instance))
private def migrationAwareGetter[T](
id: String,
logMessagePart: String,
requestFromIndexName: String => GetRequest,
resultTransformer: GetResponse => Option[T]
)(
implicit ex: ExecutionContext
): Future[Option[T]] = {
implicit val logMarker: MarkerMap = MarkerMap("id" -> id)
def getFromCurrentIndex = executeAndLog(
request = requestFromIndexName(imagesCurrentAlias),
message = s"get $logMessagePart by id $id from index with alias $imagesCurrentAlias"
).map { r =>
r.status match {
case Status.OK => resultTransformer(r.result)
case _ => None
}
}
migrationStatus match {
case running: Running => executeAndLog(
request = requestFromIndexName(running.migrationIndexName),
message = s"get $logMessagePart by id $id from migration index ${running.migrationIndexName}"
).flatMap { r =>
r.status match {
case Status.OK => Future.successful(resultTransformer(r.result))
case _ => getFromCurrentIndex
}
}
case _ => getFromCurrentIndex
}
}
def getImageWithSourceById(id: String)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[Option[SourceWrapper[Image]]] = {
migrationAwareGetter(
id,
logMessagePart = "image",
requestFromIndexName = indexName => get(indexName, id),
resultTransformer = (result: GetResponse) => mapImageFrom(result.sourceAsString, id, result.index)
)
}
def getImageUploaderById(id: String)(implicit ex: ExecutionContext): Future[Option[String]] = {
migrationAwareGetter(
id,
logMessagePart = "image uploader",
requestFromIndexName = indexName => get(indexName, id).fetchSourceInclude("uploadedBy"),
resultTransformer = _.sourceFieldOpt("uploadedBy").collect { case s: String => s }
)
}
def search(params: SearchParams)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal], logMarker:MarkerMap = MarkerMap()): Future[SearchResults] = {
val isPotentiallyGraphicFieldName = "isPotentiallyGraphic"
def resolveHit(hit: SearchHit) = mapImageFrom(
hit.sourceAsString,
hit.id,
hit.index,
fields = hit.fields match {
case null => JsObject.empty
case _ => Json.obj(
isPotentiallyGraphicFieldName -> hit.fields.get(isPotentiallyGraphicFieldName).map(_.asInstanceOf[List[Boolean]].headOption)
)
}
)
val query: Query = queryBuilder.makeQuery(params.structuredQuery)
val uploadTimeFilter = filters.date("uploadTime", params.since, params.until)
val lastModTimeFilter = filters.date("lastModified", params.modifiedSince, params.modifiedUntil)
val takenTimeFilter = filters.date("metadata.dateTaken", params.takenSince, params.takenUntil)
// we only inject filters if there are actual date parameters
val dateFilterList = List(uploadTimeFilter, lastModTimeFilter, takenTimeFilter).flatten.toNel
val dateFilter = dateFilterList.map(dateFilters => filters.and(dateFilters.list.toList: _*))
val idsFilter = params.ids.map(filters.ids)
val labelFilter = params.labels.toNel.map(filters.terms("labels", _))
val metadataFilter = params.hasMetadata.map(metadataField).toNel.map(filters.exists)
val archivedFilter = params.archived.map(filters.existsOrMissing(editsField("archived"), _))
val hasExports = params.hasExports.map(filters.existsOrMissing("exports", _))
val hasIdentifier = params.hasIdentifier.map(idName => filters.exists(NonEmptyList(identifierField(idName))))
val missingIdentifier = params.missingIdentifier.map(idName => filters.missing(NonEmptyList(identifierField(idName))))
val uploadedByFilter = params.uploadedBy.map(uploadedBy => filters.terms("uploadedBy", NonEmptyList(uploadedBy)))
val simpleCostFilter = params.free.flatMap(free => if (free) searchFilters.freeFilter else searchFilters.nonFreeFilter)
val costFilter = params.payType match {
case Some(PayType.Free) => searchFilters.freeFilter
case Some(PayType.MaybeFree) => searchFilters.maybeFreeFilter
case Some(PayType.Pay) => searchFilters.nonFreeFilter
case _ => None
}
val printUsageFilter = params.printUsageFilters.map(searchFilters.printUsageFilters)
val hasRightsCategory = params.hasRightsCategory.filter(_ == true).map(_ => searchFilters.hasRightsCategoryFilter)
val validityFilter = params.valid.map(valid => if (valid) searchFilters.validFilter else searchFilters.invalidFilter)
val persistFilter = params.persisted map {
case true => searchFilters.persistedFilter
case false => searchFilters.nonPersistedFilter
}
val usageFilter: Iterable[Query] =
params.usageStatus.toNel.map(status => filters.terms("usagesStatus", status.map(_.toString))).toOption ++
params.usagePlatform.toNel.map(filters.terms("usagesPlatform", _)).toOption
val syndicationStatusFilter = params.syndicationStatus.map(status => syndicationFilter.statusFilter(status))
// Port of special case code in elastic1 sorts. Using the dateAddedToCollection sort implies an additional filter for reasons unknown
val dateAddedToCollectionFilter = {
params.orderBy match {
case Some("dateAddedToCollection") => {
val pathHierarchyOpt = params.structuredQuery.flatMap {
case Match(HierarchyField, Phrase(value)) => Some(value)
case _ => None
}.headOption
pathHierarchyOpt.map { pathHierarchy =>
termQuery("collections.pathHierarchy", pathHierarchy)
}
}
case _ => None
}
}
val filterOpt = (
metadataFilter.toOption.toList
++ persistFilter
++ labelFilter.toOption
++ archivedFilter
++ uploadedByFilter
++ idsFilter
++ validityFilter
++ simpleCostFilter
++ costFilter
++ hasExports
++ hasIdentifier
++ missingIdentifier
++ dateFilter.toOption
++ usageFilter
++ hasRightsCategory
++ searchFilters.tierFilter(params.tier)
++ syndicationStatusFilter
++ dateAddedToCollectionFilter
++ printUsageFilter
).toNel.map(filter => filter.list.toList.reduceLeft(filters.and(_, _))).toOption
val withFilter = filterOpt.map { f =>
boolQuery() must (query) filter f
}.getOrElse(query)
val sort = params.orderBy match {
case Some("dateAddedToCollection") => sorts.dateAddedToCollectionDescending
case _ => sorts.createSort(params.orderBy)
}
val runtimeMappings = if (params.syndicationStatus.contains(AwaitingReviewForSyndication) && config.useRuntimeFieldsToFixSyndicationReviewQueueQuery) {
Seq(syndicationFilter.syndicationReviewQueueFixMapping)
} else {
Seq.empty
}
// We need to set trackHits to ensure that the total number of hits we return to users is accurate.
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking-changes-7.0.html#hits-total-now-object-search-response
val trackTotalHits = params.countAll.getOrElse(true)
val graphicImagesScriptFields =
if (params.shouldFlagGraphicImages) {
Seq(ScriptField(
field = isPotentiallyGraphicFieldName,
// the rest of the logic is in the client (in image.js)
script = Script(
//language=groovy -- it's actually painless, but that's pretty similar to groovy and this provides syntax highlighting
script = "params['_source']?.fileMetadata?.xmp !=null && params['_source']?.fileMetadata?.xmp['pur:adultContentWarning'] != null",
lang = Some("painless")
)
))
} else {
Seq.empty
}
val searchRequest = prepareSearch(withFilter)
.trackTotalHits(trackTotalHits)
.runtimeMappings(runtimeMappings)
.storedFields("_source") // this needs to be explicit when using script fields
.scriptfields(graphicImagesScriptFields)
.aggregations(if (config.shouldDisplayOrgOwnedCountAndFilterCheckbox) List(filterAgg(
orgOwnedAggName,
queryBuilder.makeQuery(Parser.run(s"is:${config.staffPhotographerOrganisation}-owned"))
)) else Nil)
.from(params.offset)
.size(params.length)
.sortBy(sort)
executeAndLog(searchRequest, "image search").
toMetric(Some(mediaApiMetrics.searchQueries), List(mediaApiMetrics.searchTypeDimension("results")))(_.result.took).map { r =>
logSearchQueryIfTimedOut(searchRequest, r.result)
val imageHits = r.result.hits.hits.map(resolveHit).toSeq.flatten.map(i => (i.instance.id, i))
// setting trackTotalHits to false means we don't get any hit count at all.
// Requester has explicitly opted into not caring about the total hits, so give them what they want (nothing).
SearchResults(
hits = imageHits,
total = if (trackTotalHits) r.result.totalHits else 0,
maybeOrgOwnedCount =
if (config.shouldDisplayOrgOwnedCountAndFilterCheckbox)
Some(r.result.aggregations.filter(orgOwnedAggName).docCount)
else
None
)
}
}
def usageForSupplier(id: String, numDays: Int)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[SupplierUsageSummary] = {
implicit val logMarker: MarkerMap = MarkerMap()
val supplier = Agencies.get(id)
val supplierName = supplier.supplier
val bePublished = termQuery("usages.status", "published")
val beInLastPeriod = rangeQuery("usages.dateAdded")
.gte(s"now-${numDays}d/d")
.lt("now/d")
val haveUsageInLastPeriod = boolQuery().must(bePublished, beInLastPeriod)
val beSupplier = termQuery("usageRights.supplier", supplierName)
val haveNestedUsage = nestedQuery("usages", haveUsageInLastPeriod)
val query = boolQuery().must(matchAllQuery()).filter(boolQuery().must(beSupplier, haveNestedUsage))
val search = prepareSearch(query) size 0
executeAndLog(search, s"$id usage search").map { r =>
import r.result
logSearchQueryIfTimedOut(search, result)
SupplierUsageSummary(supplier, result.hits.total.value)
}
}
def dateHistogramAggregate(params: AggregateSearchParams)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[AggregateSearchResults] = {
def fromDateHistogramAggregation(name: String, aggregations: Aggregations): Seq[BucketResult] = aggregations.result[DateHistogram](name).
buckets.map(b => BucketResult(b.date, b.docCount))
val aggregation = dateHistogramAgg(name = params.field, field = params.field).
calendarInterval(DateHistogramInterval.Month).
minDocCount(0)
aggregateSearch(params.field, params, aggregation, fromDateHistogramAggregation)
}
def metadataSearch(params: AggregateSearchParams)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[AggregateSearchResults] = {
aggregateSearch("metadata", params, termsAgg(name = "metadata", field = metadataField(params.field)), fromTermAggregation)
}
def editsSearch(params: AggregateSearchParams)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[AggregateSearchResults] = {
logger.info("Edit aggregation requested with params.field: " + params.field)
val field = "labels" // TODO was - params.field
aggregateSearch("edits", params, termsAgg(name = "edits", field = editsField(field)), fromTermAggregation)
}
private def fromTermAggregation(name: String, aggregations: Aggregations): Seq[BucketResult] = aggregations.result[Terms](name).
buckets.map(b => BucketResult(b.key, b.docCount))
private def aggregateSearch(name: String, params: AggregateSearchParams, aggregation: Aggregation, extract: (String, Aggregations) => Seq[BucketResult])(implicit ex: ExecutionContext): Future[AggregateSearchResults] = {
implicit val logMarker: MarkerMap = MarkerMap()
logger.info("aggregate search: " + name + " / " + params + " / " + aggregation)
val query = queryBuilder.makeQuery(params.structuredQuery)
val search = prepareSearch(query) aggregations aggregation size 0
executeAndLog(search, s"$name aggregate search")
.toMetric(Some(mediaApiMetrics.searchQueries), List(mediaApiMetrics.searchTypeDimension("aggregate")))(_.result.took).map { r =>
logSearchQueryIfTimedOut(search, r.result)
searchResultToAggregateResponse(r.result, name, extract)
}
}
private def searchResultToAggregateResponse(response: SearchResponse, aggregateName: String, extract: (String, Aggregations) => Seq[BucketResult]): AggregateSearchResults = {
val results = extract(aggregateName, response.aggregations)
AggregateSearchResults(results, results.size)
}
def completionSuggestion(name: String, q: String, size: Int)(implicit ex: ExecutionContext, request: AuthenticatedRequest[AnyContent, Principal]): Future[CompletionSuggestionResults] = {
implicit val logMarker: MarkerMap = MarkerMap()
val completionSuggestion =
ElasticDsl.completionSuggestion(name, name).text(q).skipDuplicates(true)
val search = ElasticDsl.search(imagesCurrentAlias) suggestions completionSuggestion
executeAndLog(search, "completion suggestion query").
toMetric(Some(mediaApiMetrics.searchQueries), List(mediaApiMetrics.searchTypeDimension("suggestion-completion")))(_.result.took).map { r =>
logSearchQueryIfTimedOut(search, r.result)
val x = r.result.suggestions.get(name).map { suggestions =>
suggestions.flatMap { s =>
s.toCompletion.options.map { o =>
CompletionSuggestionResult(o.text, o.score.toFloat)
}
}
}.getOrElse(Seq.empty)
CompletionSuggestionResults(x.toList)
}
}
def withSearchQueryTimeout(sr: SearchRequest): SearchRequest = sr timeout SearchQueryTimeout
private def prepareSearch(query: Query): SearchRequest = {
val indexes = migrationStatus match {
case completionPreview: CompletionPreview => List(completionPreview.migrationIndexName)
case running: Running => List(imagesCurrentAlias, running.migrationIndexName)
case _ => List(imagesCurrentAlias)
}
val migrationAwareQuery = migrationStatus match {
case running: Running => filters.and(query, filters.mustNot(filters.term("esInfo.migration.migratedTo", running.migrationIndexName)))
case _ => query
}
val searchRequest = ElasticDsl.search(indexes) query migrationAwareQuery
withSearchQueryTimeout(searchRequest)
}
private def mapImageFrom(sourceAsString: String, id: String, fromIndex: String, fields: JsObject = JsObject.empty) = {
val source = Json.parse(sourceAsString)
source.validate[Image] match {
case i: JsSuccess[Image] => Some(SourceWrapper(source, i.value, fromIndex, fields))
case e: JsError =>
logger.error("Failed to parse image from source string " + id + ": " + e.toString)
None
}
}
private def logSearchQueryIfTimedOut(req: SearchRequest, res: SearchResponse) =
if (res.isTimedOut) logger.info(s"SearchQuery was TimedOut after $SearchQueryTimeout \nquery: ${req.show}")
}