backend/app/services/index/ElasticsearchPages.scala (227 lines of code) (raw):

package services.index import com.sksamuel.elastic4s.ElasticClient import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.fields.ObjectField import com.sksamuel.elastic4s.requests.searches.queries.Query import com.sksamuel.elastic4s.requests.searches.{HighlightField, MultisearchResponseItem} import model.index.{Page, PageResult, PagesSummary} import model.{Language, Languages, Uri} import services.ElasticsearchSyntax import services.index.HitReaders.{PageHitReader, RichFieldMap} import utils.Logging import utils.attempt.{Attempt, ElasticSearchQueryFailure, MultipleFailures, NotFoundFailure} import scala.concurrent.ExecutionContext class ElasticsearchPages(val client: ElasticClient, indexNamePrefix: String)(implicit val ex: ExecutionContext) extends Pages with ElasticsearchSyntax with Logging { val textIndexName = s"$indexNamePrefix-text" override def setup(): Attempt[Pages] = { createIndexIfNotAlreadyExists(textIndexName, properties( keywordField(PagesFields.resourceId), intField(PagesFields.page), emptyMultiLanguageField(PagesFields.value), ObjectField(PagesFields.dimensions, properties = Seq( floatField(PagesFields.width), floatField(PagesFields.height), floatField(PagesFields.top), floatField(PagesFields.bottom) )) ) ).flatMap { _ => Attempt.sequence(Languages.all.map(addLanguage)) }.map { _ => this } } def addLanguage(language: Language)= executeNoReturn { putMapping(textIndexName).as( multiLanguageField(PagesFields.value, language) ) } override def addPageContents(uri: Uri, pages: Seq[Page]): Attempt[Unit] = { val ops = pages.map { page => indexInto(textIndexName) .id(s"${uri.value}-${page.page}") .fields( Map( PagesFields.resourceId -> uri.value, PagesFields.page -> page.page, PagesFields.value -> page.value.map { case(lang, value) => lang.key -> value }, s"${PagesFields.dimensions}.${PagesFields.width}" -> page.dimensions.width, s"${PagesFields.dimensions}.${PagesFields.height}" -> page.dimensions.height, s"${PagesFields.dimensions}.${PagesFields.top}" -> page.dimensions.top, s"${PagesFields.dimensions}.${PagesFields.bottom}" -> page.dimensions.bottom ) ) } executeBulk(ops) } override def getTextPages(uri: Uri, top: Double, bottom: Double, highlightQuery: Option[String]): Attempt[PageResult] = for { total <- getTotalPageCount(textIndexName, uri) totalHeight <- getTotalHeight(textIndexName, uri) query = highlightQuery.map(buildQuery) highlightFields = query.toList.flatMap { query => HighlightFields.languageHighlighters(PagesFields.value, query) // Ensure we get the whole page, not just the highlights .map(_.numberOfFragments(0)) } pages <- getPages(textIndexName, uri, top, bottom, query, highlightFields) pagesWithNextAndPreviousHighlights <- getPagesWithNextAndPreviousHighlights(textIndexName, uri, highlightQuery, highlightFields, pages) } yield { PageResult( PagesSummary(total, totalHeight), distinctByPageNumber(pages ::: pagesWithNextAndPreviousHighlights) ) } override def getPage(uri: Uri, pageNumber: Int, highlightQuery: Option[String]): Attempt[Page] = { val query = highlightQuery.map(buildQuery) val highlightFields = query.toList.flatMap { query => HighlightFields.languageHighlighters(PagesFields.value, query) // Ensure we get the whole page, not just the highlights .map(_.numberOfFragments(0)) } execute { search(textIndexName) .termQuery("_id", s"${uri.value}-$pageNumber") .highlighting(highlightFields) }.flatMap { response => val pages = response.to[Page] if(pages.isEmpty) { Attempt.Left(NotFoundFailure(s"Missing page $pageNumber for $uri")) } else { Attempt.Right(pages.head) } } } // TODO MRB: collapse total page count and height into fields on the document itself // TODO SC/JS: We agree. private def getTotalPageCount(indexName: String, uri: Uri): Attempt[Long] = { execute { count(indexName).query( termQuery(PagesFields.resourceId, uri.value), ) }.map { resp => resp.count } } private def getTotalHeight(indexName: String, uri: Uri): Attempt[Double] = { execute { search(indexName).query( should(matchAllQuery()).filter(termQuery(PagesFields.resourceId, uri.value)) ) .size(0) .aggs(sumAgg("total_height",s"${PagesFields.dimensions}.${PagesFields.height}")) }.map { resp => resp.aggregations.data.objectField("total_height").doubleField("value") } } private def getPages(indexName: String, uri: Uri, top: Double, bottom: Double, query: Option[Query], highlightFields: List[HighlightField]): Attempt[List[Page]] = { val rangeFilters = List( rangeQuery(s"${PagesFields.dimensions}.${PagesFields.bottom}").gt(top), rangeQuery(s"${PagesFields.dimensions}.${PagesFields.top}").lt(bottom) ) val filters = List(termQuery(PagesFields.resourceId, uri.value)) ++ rangeFilters // TODO MRB: removed the size parameter so can bring in total count and agg size to a single query // TODO SC/JS: Not sure about this? Maybe just denormalize the page height/count? execute { search(indexName).query( should(query.getOrElse(matchAllQuery())) .filter(filters) ) .sortBy(fieldSort(PagesFields.page).asc()) .highlighting(highlightFields) }.flatMap { resp => val pages = resp.to[Page].toList if(pages.isEmpty) { // TODO: this could also occur if the viewport passed from the client didn't match any pages // strictly speaking should this only be a 404 if no pages are found for the given blob ID? // In practice we don't think it matters and we have full control of the client code at the moment so can always // change the response here as needed Attempt.Left(NotFoundFailure(s"No pages found for ${uri.value}")) } else { Attempt.Right(pages) } } } private def getPagesWithNextAndPreviousHighlights(indexName: String, uri: Uri, highlightQuery: Option[String], highlightFields: List[HighlightField], pagesInViewport: List[Page]): Attempt[List[Page]] = { highlightQuery match { case None => Attempt.Right(List.empty) case Some(q) => val firstPageInViewport = pagesInViewport.minBy(_.page).page val lastPageInViewport = pagesInViewport.maxBy(_.page).page val query = buildQuery(q) val documentFilter = termQuery(PagesFields.resourceId, uri.value) execute { multi( // The first page to contain highlights search(indexName) .sortByFieldAsc(PagesFields.page) .highlighting(highlightFields) .size(1) .query( // NB this is a "must" rather than a "should" so we only return pages containing highlights must(query).filter(documentFilter) ), // The last page to contain highlights PRIOR to the FIRST page in the viewport search(indexName) .sortByFieldDesc(PagesFields.page) .highlighting(highlightFields) .size(1) .query( must(query).filter( documentFilter, rangeQuery(PagesFields.page).lt(firstPageInViewport) ) ), // The first page to contain highlights AFTER the LAST page in the viewport search(indexName) .sortByFieldAsc(PagesFields.page) .highlighting(highlightFields) .size(1) .query( must(query).filter( documentFilter, rangeQuery(PagesFields.page).gt(lastPageInViewport) ) ), // The last page to contain highlights search(indexName) .sortByFieldDesc(PagesFields.page) .highlighting(highlightFields) .size(1) .query( must(query).filter(documentFilter) ) ) }.flatMap { response => val results = response.items.collect { case MultisearchResponseItem(_, _, Right(result)) => result } val errors = response.items.collect { case MultisearchResponseItem(_, status, Left(err)) => ElasticSearchQueryFailure(new IllegalStateException(err.toString), status, None) } if(errors.nonEmpty) { Attempt.Left(MultipleFailures(errors.toList)) } else { val pointers = results.flatMap(_.hits.hits).map(_.to[Page]).sortBy(_.page) if(pointers.isEmpty) { Attempt.Right(List.empty) } else { val min = pointers.head val max = pointers.last val maybePrior = pointers.reverse.find(_.page < firstPageInViewport) val maybeAfter = pointers.find(_.page > lastPageInViewport) // The first and last are used if you need to wrap around, e.g. you're on highlight 0 and you press "previous" // you should be taken to the last highlight in the entire document val prior = maybePrior.getOrElse(max) val after = maybeAfter.getOrElse(min) Attempt.Right(distinctByPageNumber(List(after, prior))) } } } } } private def distinctByPageNumber(pages: List[Page]): List[Page] = pages .groupBy(_.page) .values .map(_.head) .toList private def buildQuery(q: String) = queryStringQuery(q) .defaultOperator("and") .field(s"${PagesFields.value}.*") .quoteFieldSuffix(".exact") } object PagesFields { val resourceId = "resourceId" val page = "page" val value = "value" val dimensions = "dimensions" val width = "width" val height = "height" val top = "top" val bottom = "bottom" val total = "total" }