thrall/app/lib/elasticsearch/ElasticSearch.scala (653 lines of code) (raw):

package lib.elasticsearch import org.apache.pekko.actor.Scheduler import com.gu.mediaservice.lib.ImageFields import com.gu.mediaservice.lib.elasticsearch.filters import com.gu.mediaservice.lib.elasticsearch.{ElasticSearchClient, ElasticSearchConfig, ElasticSearchExecutions, ReapableEligibility, Running} import com.gu.mediaservice.lib.formatting.printDateTime import com.gu.mediaservice.lib.logging.{LogMarker, MarkerMap} import com.gu.mediaservice.model._ import com.gu.mediaservice.model.leases.MediaLease import com.gu.mediaservice.model.usage.{Usage, UsageNotice} import com.gu.mediaservice.syntax._ import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.requests.script.Script import com.sksamuel.elastic4s.requests.searches.SearchResponse import com.sksamuel.elastic4s.requests.searches.queries.Query import com.sksamuel.elastic4s.requests.searches.queries.compound.BoolQuery import com.sksamuel.elastic4s.requests.searches.sort.SortOrder import com.sksamuel.elastic4s.requests.update.UpdateRequest import com.sksamuel.elastic4s.{ElasticDsl, Executor, Functor, Handler, Response} import lib.{BatchDeletionIds, ThrallMetrics} import org.joda.time.DateTime import play.api.libs.json.JsValue.jsValueToJsLookup import play.api.libs.json._ import scala.annotation.nowarn import scala.concurrent.duration.DurationInt import scala.concurrent.{ExecutionContext, Future} object ImageNotDeletable extends Throwable("Image cannot be deleted") class ElasticSearch( config: ElasticSearchConfig, metrics: Option[ThrallMetrics], val scheduler: Scheduler ) extends ElasticSearchClient with ImageFields with ElasticSearchExecutions with ThrallMigrationClient { lazy val imagesCurrentAlias: String = config.aliases.current lazy val imagesMigrationAlias: String = config.aliases.migration lazy val url: String = config.url lazy val shards: Int = config.shards lazy val replicas: Int = config.replicas def migrationAwareUpdater[REQUEST, RESPONSE]( requestFromIndexName: String => REQUEST, logMessageFromIndexName: String => String, notFoundSuccessful: Boolean = false, )(implicit ex: ExecutionContext, functor: Functor[Future], executor: Executor[Future], handler: Handler[REQUEST, RESPONSE], manifest: Manifest[RESPONSE], logMarkers: LogMarker ): Future[Response[RESPONSE]] = { // if doc does not exist in migration index, ignore (ie. mark as successful). // coalesce all other errors. val runForCurrentIndex: Future[Option[Response[RESPONSE]]] = executeAndLog(requestFromIndexName(imagesCurrentAlias), logMessageFromIndexName(imagesCurrentAlias), notFoundSuccessful).map(Some(_)) // Update requests to the alias throw if the alias does not exist, but the exception is very generic and not cause is not obvious // ("index names must be all upper case") val runForMigrationIndex: Future[Option[Response[RESPONSE]]] = migrationStatus match { case _: Running => executeAndLog(requestFromIndexName(imagesMigrationAlias), logMessageFromIndexName(imagesMigrationAlias), notFoundSuccessful = true).map(Some(_)) case _ => Future.successful(None) } // remove the optionality of the completed futures. runForCurrentIndex will always be Some, so there will always be a head. Future.sequence(List(runForCurrentIndex, runForMigrationIndex)).map(_.flatten.head) } def setMigrationInfo(imageId: String, migrationInfo: MigrationInfo)(implicit ex: ExecutionContext, logMarker: LogMarker): Future[Response[Any]] = { val esInfo = EsInfo(migration = Some(migrationInfo)) val container = Json.obj("esInfo" -> Json.toJson(esInfo)) val request = updateById(imagesCurrentAlias, imageId) .doc(Json.stringify(container)) executeAndLog(request, s"Setting migration info on image id: ${imageId}") } def directInsert(image: Image, indexName: String)(implicit ex: ExecutionContext, logMarker: LogMarker): Future[ElasticSearchInsertResponse] = executeAndLog( indexInto(indexName).id(image.id).source(Json.stringify(Json.toJson(image))), s"ES6 indexing image ${image.id} into index '$indexName'" ).map(indexResponse => ElasticSearchInsertResponse(indexResponse.result.index) ) def migrationAwareIndexImage(id: String, image: Image, lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): Future[ElasticSearchUpdateResponse] = { // On insert, we know we will not have a lastModified to consider, so we always take the one we get val insertImage = image.copy(lastModified = Some(lastModified)) val insertImageAsJson = Json.toJson(insertImage) def runUpsertIntoIndex(indexAlias: String, maybeEsInfo: Option[JsObject]) = { val esInfo = maybeEsInfo.getOrElse(JsObject.empty) // On update, we do not want to take the one we have been given unless it is newer - see updateLastModifiedScript script val updateImage = image.copy(lastModified = None) val upsertImageAsJson = Json.toJson(updateImage) val painlessSource = // If there are old identifiers, then merge any new identifiers into old and use the merged results as the new identifiers """ | if (ctx._source.identifiers != null) { | ctx._source.identifiers.putAll(params.update_doc.identifiers); | params.update_doc.identifiers = ctx._source.identifiers | } | | ctx._source.putAll(params.update_doc); | | if (ctx._source.metadata != null && ctx._source.metadata.credit != null) { | ctx._source.suggestMetadataCredit = [ "input": [ ctx._source.metadata.credit ] ] | } """ val scriptSource = loadUpdatingModificationPainless(s""" |$painlessSource |$refreshEditsScript | """) val script: Script = prepareScript(scriptSource, lastModified, ("update_doc", asNestedMap(asImageUpdate(upsertImageAsJson.as[JsObject] ++ esInfo))) ) val indexRequest = updateById(indexAlias, id). upsert(Json.stringify(insertImageAsJson.as[JsObject] ++ esInfo)). script(script) executeAndLog(indexRequest, s"ES6 indexing image $id into index aliased by '$indexAlias'") } val runUpsertIntoMigrationIndexAndReturnEsInfoForCurrentIndex: Future[JsObject] = migrationStatus match { case running: Running => runUpsertIntoIndex(imagesMigrationAlias, maybeEsInfo = None) .map(_ => EsInfo(Some(MigrationInfo(migratedTo = Some(running.migrationIndexName))))) .recover { case error => EsInfo(Some(MigrationInfo(failures = Some(Map( running.migrationIndexName -> error.getMessage ))))) } .map(esInfo => Json.obj("esInfo" -> Json.toJson(esInfo))) case _ => Future.successful(JsObject.empty) } for { esInfo <- runUpsertIntoMigrationIndexAndReturnEsInfoForCurrentIndex _ <- runUpsertIntoIndex(imagesCurrentAlias, maybeEsInfo = Some(esInfo)) } yield ElasticSearchUpdateResponse() } def getImage(id: String)(implicit ex: ExecutionContext, logMarker: LogMarker): Future[Option[Image]] = { executeAndLog(get(imagesCurrentAlias, id), s"ES6 get image by $id").map { r => if (r.result.found) { Some(Json.parse(r.result.sourceAsString).as[Image]) } else { None } } } def getImageVersion(id: String)(implicit ex: ExecutionContext, logMarker: LogMarker = MarkerMap()): Future[Option[Long]] = { executeAndLog(get(imagesCurrentAlias, id), s"ES6 get image version by $id").map { r => if (r.result.found) { Some(r.result.version) } else { None } } } def updateImageUsages(id: String, usages: Seq[Usage], lastModified: DateTime) (implicit ex: ExecutionContext,logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val replaceUsagesScript = loadUpdatingModificationPainless(s""" | def lastUpdatedDate = ctx._source.usagesLastModified != null ? Date.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(ctx._source.usagesLastModified))) : null; | if (lastUpdatedDate == null || modificationDate.after(lastUpdatedDate)) { | ctx._source.usages = params.usages; | ctx._source.usagesLastModified = params.lastModified; | } """) val usagesParameter = usages.map(i => asNestedMap(Json.toJson(i))) val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, replaceUsagesScript, lastModified, ("usages", usagesParameter)), logMessageFromIndexName = indexName => s"ES6 updating usages on image $id for index $indexName" ).incrementOnFailure(metrics.map(_.failedUsagesUpdates)){case _ => true} List(eventualUpdateResponse.map(_ => ElasticSearchUpdateResponse())) } def updateImageSyndicationRights(id: String, rights: Option[SyndicationRights], lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val replaceSyndicationRightsScript = """ | ctx._source.syndicationRights = params.syndicationRights; """.stripMargin val rightsParameter = rights.map(sr => asNestedMap(sr)).orNull val scriptSource = loadUpdatingModificationPainless(replaceSyndicationRightsScript) List(migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, scriptSource, lastModified, ("syndicationRights", rightsParameter)), logMessageFromIndexName = indexName => s"ES6 updating syndicationRights on image $id in index $indexName with rights $rightsParameter" ).map(_ => ElasticSearchUpdateResponse())) } def applyImageMetadataOverride(id: String, metadata: Edits, lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val photoshootSuggestionScript = """ | if (ctx._source.userMetadata.photoshoot != null) { | ctx._source.userMetadata.photoshoot.suggest = [ "input": [ ctx._source.userMetadata.photoshoot.title ] ]; | } """.stripMargin val metadataParameter = JsDefined(Json.toJson(metadata)).toOption.map(asNestedMap).orNull val replaceUserMetadata = """ | def lastUpdatedDate = ctx._source.userMetadataLastModified != null ? Date.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(ctx._source.userMetadataLastModified))) : null; | if (lastUpdatedDate == null || modificationDate.after(lastUpdatedDate)) { | ctx._source.userMetadata = params.userMetadata; | ctx._source.userMetadataLastModified = params.lastModified; | } | """ val scriptSource = loadUpdatingModificationPainless( s""" | $replaceUserMetadata | $refreshEditsScript | $photoshootSuggestionScript """ ) /* TODO: It should never be possible for Edits to have an empty lastModified although there will be * messages in the stream where it is missing until the stream content expires so we fall back for now but can * remove this in a week or so after merging */ if (metadata.lastModified.isEmpty) logger.warn(logMarker, "edit object missing last modified value") val appliedLastModified = metadata.lastModified.getOrElse(lastModified) List(migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest( indexName = indexName, id = id, scriptSource = scriptSource, lastModified = appliedLastModified, ("userMetadata", metadataParameter) ), logMessageFromIndexName = indexName => s"ES6 updating user metadata on image $id in index $indexName with lastModified $appliedLastModified" ).map(_ => ElasticSearchUpdateResponse())) } private def softDeletedMetadataAsPainlessScript(softDeletedMetadata: SoftDeletedMetadata) = { val applySoftDeleteScript = "ctx._source.softDeletedMetadata = params.softDeletedMetadata;" val softDeletedMetadataParameter = JsDefined(Json.toJson(softDeletedMetadata)).toOption.map(asNestedMap).orNull prepareScript( applySoftDeleteScript, lastModified = softDeletedMetadata.deleteTime, ("softDeletedMetadata", softDeletedMetadataParameter) ) } def applySoftDelete(id: String, softDeletedMetadata: SoftDeletedMetadata, lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { List(migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest( indexName, id, softDeletedMetadataAsPainlessScript(softDeletedMetadata), ), logMessageFromIndexName = indexName => s"ES7 soft delete image $id in $indexName by ${softDeletedMetadata.deletedBy}" ).map(_ => ElasticSearchUpdateResponse())) } def applyUnSoftDelete(id: String, lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val applyUnSoftDeleteScript = "ctx._source.remove(\"softDeletedMetadata\");" List(migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest( indexName, id, applyUnSoftDeleteScript, lastModified ), logMessageFromIndexName = indexName => s"ES7 un soft delete image $id in $indexName" ).map(_ => ElasticSearchUpdateResponse())) } private def getNextBatchOfImageIdsForDeletion(query: Query, count: Int, deletionType: String) (implicit ex: ExecutionContext, logMarker: LogMarker) = executeAndLog( ElasticDsl.search(imagesCurrentAlias) // current index is sufficient for producing the list of IDs to delete .query(query) .storedFields("_id") .sortByFieldAsc("uploadTime") .size(count), s"ES7 searching for oldest $count images to $deletionType delete" ).map(_.result.hits.hits.map(_.id).toSet) private def countTotalReapable(query: Query, deletionType: String) (implicit ex: ExecutionContext, logMarker: LogMarker): Future[Long] = executeAndLog( ElasticDsl.count(imagesCurrentAlias).query(query), s"counting '$deletionType' reapable images" ).map(_.result.count) private def softReapableQuery(isReapable: ReapableEligibility) = must( isReapable.query, filters.existsOrMissing("softDeletedMetadata", exists = false) // not already soft deleted ) def countTotalSoftReapable(isReapable: ReapableEligibility)(implicit ex: ExecutionContext, logMarker: LogMarker): Future[Long] = countTotalReapable(softReapableQuery(isReapable), "soft") def softDeleteNextBatchOfImages(isReapable: ReapableEligibility, count: Int, softDeletedMetadata: SoftDeletedMetadata) (implicit ex: ExecutionContext, logMarker: LogMarker): Future[BatchDeletionIds] = { val query = softReapableQuery(isReapable) for { // unfortunately 'updateByQuery' doesn't return the affected IDs so can't do this whole thing in one operation - https://github.com/elastic/elasticsearch/issues/48624 ids <- getNextBatchOfImageIdsForDeletion(query, count, "soft") esResults <- if(ids.isEmpty) Future.successful(Seq.empty) else migrationAwareUpdater( requestFromIndexName = indexName => bulk(ids.map( updateById(indexName, _) .script(softDeletedMetadataAsPainlessScript(softDeletedMetadata)) )), logMessageFromIndexName = indexName => s"ES7 soft delete ${ids.size} images in $indexName by ${softDeletedMetadata.deletedBy}" ).map(_.result.items) } yield { if (ids.isEmpty) { logger.info(s"Although $count images were requested to be soft deleted, none were found to be soft deletable.") } esResults.filter(_.error.isDefined).foreach(item => logger.error(logMarker, s"ES7 failed to soft delete image ${item.id} : ${item.error.get}") ) BatchDeletionIds(ids, esResults.filter(_.error.isEmpty).map(_.id).toSet) } } private def hardReapableQuery(isReapable: ReapableEligibility, daysInSoftDeletedState: Int) = must( isReapable.query, filters.existsOrMissing("softDeletedMetadata", exists = true), // already soft deleted rangeQuery("softDeletedMetadata.deleteTime").lt(DateTime.now.minusDays(daysInSoftDeletedState).toString) // soft deleted more than 2 weeks ago (default) ) def countTotalHardReapable(isReapable: ReapableEligibility, daysInSoftDeletedState: Int)(implicit ex: ExecutionContext, logMarker: LogMarker): Future[Long] = countTotalReapable(hardReapableQuery(isReapable, daysInSoftDeletedState), "hard") def hardDeleteNextBatchOfImages(isReapable: ReapableEligibility, count: Int, daysInSoftDeletedState: Int) (implicit ex: ExecutionContext, logMarker: LogMarker): Future[BatchDeletionIds] = { val query = hardReapableQuery(isReapable, daysInSoftDeletedState) for { // unfortunately 'deleteByQuery' doesn't return the affected IDs so can't do this whole thing in one operation - https://github.com/elastic/elasticsearch/issues/45460 ids <- getNextBatchOfImageIdsForDeletion(query, count, "hard") esResults <- if(ids.isEmpty) Future.successful(Seq.empty) else migrationAwareUpdater( requestFromIndexName = indexName => bulk(ids.map( deleteById(indexName, _) )), logMessageFromIndexName = indexName => s"ES7 hard delete ${ids.size} images in $indexName" ).map(_.result.items) } yield { if (ids.isEmpty) { logger.info(s"Although $count images were requested to be hard deleted, none were found to be hard deletable.") } esResults.filter(_.error.isDefined).foreach(item => logger.error(logMarker, s"ES7 failed to hard delete image ${item.id} : ${item.error.get}") ) BatchDeletionIds(ids, esResults.filter(_.error.isEmpty).map(_.id).toSet) } } def getInferredSyndicationRightsImages(photoshoot: Photoshoot, excludedImageId: Option[String]) (implicit ex: ExecutionContext, logMarker: LogMarker): Future[List[Image]] = { // TODO could be a Seq val inferredSyndicationRights = not(termQuery("syndicationRights.isInferred", false)) // Using 'not' to include nulls val filter = excludedImageId match { case Some(imageId) => boolQuery() must( inferredSyndicationRights, not(idsQuery(imageId)) ) case _ => inferredSyndicationRights } val filteredMatches: BoolQuery = boolQuery() must( matchQuery(photoshootField("title"), photoshoot.title), filter ) val request = search(imagesCurrentAlias) bool filteredMatches limit 200 // TODO no order? executeAndLog(request, s"ES6 get images in photoshoot ${photoshoot.title} with inferred syndication rights (excluding $excludedImageId)").map { r => r.result.hits.hits.toList.map { h => Json.parse(h.sourceAsString).as[Image] } } } def getLatestSyndicationRights(photoshoot: Photoshoot, excludedImageId: Option[String]) (implicit ex: ExecutionContext, logMarker: LogMarker): Future[Option[Image]] = { val nonInferredSyndicationRights = termQuery("syndicationRights.isInferred", false) val filter = excludedImageId match { case Some(imageId) => boolQuery() must( nonInferredSyndicationRights, not(idsQuery(imageId)) ) case _ => nonInferredSyndicationRights } val filteredMatches = boolQuery() must( matchQuery(photoshootField("title"), photoshoot.title), filter ) val syndicationRightsPublishedDescending = fieldSort("syndicationRights.published").order(SortOrder.DESC) val request = search(imagesCurrentAlias) bool filteredMatches sortBy syndicationRightsPublishedDescending executeAndLog(request, s"ES6 get image in photoshoot ${photoshoot.title} with latest rcs syndication rights (excluding $excludedImageId)").map { r => r.result.hits.hits.toList.headOption.map { h => Json.parse(h.sourceAsString).as[Image] } } } private def deleteFromIndex(id: String, indexName: String, query: Query)(implicit ex: ExecutionContext, logMarker: LogMarker) = { executeAndLog(count(indexName).query(query), s"ES6 searching for image to delete: $id in index $indexName").flatMap { r => val deleteFuture = r.result.count match { case 1 => executeAndLog(deleteById(indexName, id), s"ES6 deleting image $id from index $indexName") case _ => Future.failed(ImageNotDeletable) } deleteFuture .incrementOnSuccess(metrics.map(_.deletedImages)) .incrementOnFailure(metrics.map(_.failedDeletedImages)) { case ImageNotDeletable => true } } } def deleteImage(id: String) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchDeleteResponse]] = { // search for the image first, and then only delete and succeed // this is because the delete query does not respond with anything useful // TODO: is there a more efficient way to do this? val deletableImage = boolQuery().withMust( idsQuery(id)).withNot( existsQuery("exports"), nestedQuery(path = "usages", query = existsQuery("usages")) ) (migrationStatus match { case running: Running => List(imagesCurrentAlias, running.migrationIndexName) case _ => List(imagesCurrentAlias) }).map { index => deleteFromIndex(id, index, deletableImage).map { _ => ElasticSearchDeleteResponse() } } } def deleteSingleImageUsage( id: String, usageId: String, lastModified: DateTime )(implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val deleteSingleUsageScript = loadUpdatingModificationPainless("ctx._source.usages.removeIf(usage -> usage.id == params.usageId);") val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, deleteSingleUsageScript, lastModified, "usageId" -> usageId), logMessageFromIndexName = indexName => s"ES6 removing usage $usageId on image $id in index $indexName", notFoundSuccessful = true ).incrementOnFailure(metrics.map(_.failedUsagesUpdates)) { case _ => true } List(eventualUpdateResponse.map(response => { if (response.status == 404) { logger.warn("Attempted to delete usage for non-existent image.") } ElasticSearchUpdateResponse() })) } def deleteAllImageUsages( id: String, lastModified: DateTime )(implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val deleteUsagesScript = loadUpdatingModificationPainless("ctx._source.remove('usages');") val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, deleteUsagesScript, lastModified), logMessageFromIndexName = indexName => s"ES6 removing all usages on image $id in index $indexName", notFoundSuccessful = true ).incrementOnFailure(metrics.map(_.failedUsagesUpdates)){case _ => true} List(eventualUpdateResponse.map(response => { if(response.status == 404){ logger.warn("Attempted to delete usages for non-existent image.") } ElasticSearchUpdateResponse() })) } def updateUsageStatus(id: String, usages: Seq[Usage], lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val updateUsageStatusScript = s""" | for(int i = 0; i < ctx._source.usages.size(); i++) { | if(ctx._source.usages[i].id == params.usage.id) { | ctx._source.usages[i].status = params.usage.status; | ctx._source.usages[i].lastModified = params.lastModified; | ctx._source.usagesLastModified = params.lastModified; | } | } |""".stripMargin val scriptSource = loadUpdatingModificationPainless(updateUsageStatusScript) val usageParameters = JsDefined(Json.toJson(usages.head)).toOption.map(_.as[Usage]).map(i => asNestedMap(Json.toJson(i))).orNull List(migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, scriptSource, lastModified, ("usage", usageParameters)), logMessageFromIndexName = indexName => s"ES6 updating usagesRights on image $id and usages id ${usageParameters.get("id")} " + s"in index $indexName with usage $usageParameters" ).map(_ => ElasticSearchUpdateResponse())) } def deleteSyndicationRights(id: String, lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val deleteSyndicationRightsScript = s""" | $modificationDateFormatting | ctx._source.remove('syndicationRights'); | $updateLastModifiedScript """.stripMargin val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, deleteSyndicationRightsScript, lastModified), logMessageFromIndexName = indexName => s"ES6 removing syndication rights on image $id in index $indexName", notFoundSuccessful = true ).incrementOnFailure(metrics.map(_.failedSyndicationRightsUpdates)){case _ => true} List(eventualUpdateResponse.map(_ => ElasticSearchUpdateResponse())) } def replaceImageLeases(id: String, leases: Seq[MediaLease], lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val replaceLeasesScript = """ | ctx._source.leases = ["leases": params.leases, "lastModified": params.lastModified]; | """.stripMargin val scriptSource = loadUpdatingModificationPainless(replaceLeasesScript) val leasesParameter = leases.map(l => asNestedMap(Json.toJson(l))) val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, scriptSource, lastModified, ("leases", leasesParameter)), logMessageFromIndexName = indexName => s"ES6 updating all leases on image $id in index $indexName with: ${leases.toString}" ).incrementOnFailure(metrics.map(_.failedSyndicationRightsUpdates)){case _ => true} List(eventualUpdateResponse.map(_ => ElasticSearchUpdateResponse())) } private def prepareScript(scriptSource: String, lastModified: DateTime, params: (String, Object)*) = Script(script = scriptSource).lang("painless").param("lastModified", printDateTime(lastModified)).params(params) private def prepareUpdateRequest(indexName: String, id: String, script: Script): UpdateRequest = updateById(indexName, id).script(script) private def prepareUpdateRequest(indexName: String, id: String, scriptSource: String, lastModified: DateTime, params: (String, Object)*): UpdateRequest = prepareUpdateRequest(indexName, id, prepareScript(scriptSource, lastModified, params:_*)) def addImageLease(id: String, lease: MediaLease, lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val addLeaseScript = """| if (ctx._source.leases == null || ctx._source.leases.leases == null) { | ctx._source.leases = ["leases": [params.lease], "lastModified": params.lastModified]; | } else { | ctx._source.leases.leases.add(params.lease); | ctx._source.leases.lastModified = params.lastModified; | } """.stripMargin val scriptSource = loadUpdatingModificationPainless(addLeaseScript) val leaseParameter = JsDefined(Json.toJson(lease)).toOption.map(_.as[MediaLease]).map(i => asNestedMap(Json.toJson(i))).orNull val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, scriptSource, lastModified, ("lease", leaseParameter)), logMessageFromIndexName = indexName => s"ES6 adding lease on image $id in index $indexName with: $leaseParameter" ).incrementOnFailure(metrics.map(_.failedUsagesUpdates)){case _ => true} List(eventualUpdateResponse.map(_ => ElasticSearchUpdateResponse())) } def removeImageLease(id: String, leaseId: Option[String], lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val removeLeaseScript = """| | for(int i = 0; i < ctx._source.leases.leases.size(); i++) { | if (ctx._source.leases.leases[i].id == params.leaseId) { | ctx._source.leases.leases.remove(i); | ctx._source.leases.lastModified = params.lastModified; | } | } """ val scriptSource = loadUpdatingModificationPainless(removeLeaseScript) val leaseIdParameter = JsDefined(Json.toJson(leaseId)).toOption.map(_.as[String]).orNull val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, scriptSource, lastModified, ("leaseId", leaseIdParameter)), logMessageFromIndexName = indexName => s"ES6 removing lease with id $leaseIdParameter from image $id in index $indexName", notFoundSuccessful = true ).incrementOnFailure(metrics.map(_.failedUsagesUpdates)) { case _ => true } List(eventualUpdateResponse.map(_ => ElasticSearchUpdateResponse())) } def updateImageExports(id: String, exports: Seq[Crop], lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val addExportsScript = """| if (ctx._source.exports == null) { | ctx._source.exports = params.exports; | } else { | ctx._source.exports.addAll(params.exports); | } """ val scriptSource = loadUpdatingModificationPainless(addExportsScript) val exportsParameter = JsDefined(Json.toJson(exports)).toOption.map { cs: JsValue => // TODO deduplicate with set collections cs.as[JsArray].value.map { c => asNestedMap(c) }.toSeq }.orNull val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, scriptSource, lastModified, ("exports", exportsParameter)), logMessageFromIndexName = indexName => s"ES6 updating exports on image $id in index $indexName" ).incrementOnFailure(metrics.map(_.failedExportsUpdates)) { case _ => true } List(eventualUpdateResponse.map(_ => ElasticSearchUpdateResponse())) } def deleteImageExports(id: String, lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val deleteExportsScript = "ctx._source.remove('exports');" val scriptSource = loadUpdatingModificationPainless(deleteExportsScript) val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, scriptSource, lastModified), logMessageFromIndexName = indexName => s"ES6 removing exports from image $id in index $indexName", notFoundSuccessful = true ).incrementOnFailure(metrics.map(_.failedExportsUpdates)) { case _ => true } List(eventualUpdateResponse.map(_ => ElasticSearchUpdateResponse())) } def setImageCollections(id: String, collections: Seq[Collection], lastModified: DateTime) (implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = { val setImagesCollectionScript = "ctx._source.collections = params.collections;" val setImageCollectionsScript = loadUpdatingModificationPainless(setImagesCollectionScript) val collectionsParameter = JsDefined(Json.toJson(collections)).toOption.map { cs: JsValue => cs.as[JsArray].value.map { c => asNestedMap(c) }.toSeq }.orNull val eventualUpdateResponse = migrationAwareUpdater( requestFromIndexName = indexName => prepareUpdateRequest(indexName, id, setImageCollectionsScript, lastModified, ("collections", collectionsParameter)), logMessageFromIndexName = indexName => s"ES6 setting collections on image $id in index $indexName" ).incrementOnFailure(metrics.map(_.failedCollectionsUpdates)) { case _ => true } List(eventualUpdateResponse.map(_ => ElasticSearchUpdateResponse())) } private val scrollPageSize = 10000 private def handleImageIdScrollResponse( message: String, response: Response[SearchResponse] )(implicit ec: ExecutionContext, logMarker: LogMarker): Future[Seq[String]] = { val ids = response.result.hits.hits.toSeq.map(_.id) if (response.result.hits.size >= scrollPageSize && response.result.scrollId.isDefined) { continueScrollingImageIds(message, response.result.scrollId.get).map(ids ++ _) } else { if (response.result.scrollId.isDefined) { closeScroll(response.result.scrollId.get) } Future.successful(ids) } } private def continueScrollingImageIds( message: String, scrollId: String )(implicit ec: ExecutionContext, logMarker: LogMarker): Future[Seq[String]] = { val req = searchScroll(scrollId) executeAndLog(req, message) .flatMap(response => handleImageIdScrollResponse(message, response)) } def listImageIdsWithPrefix(prefix: String)( implicit ec: ExecutionContext, logMarker: LogMarker ): Future[Seq[String]] = { val req = search(imagesCurrentAlias) .size(scrollPageSize) .fetchSource(false) .scroll(60.seconds) .query(prefixQuery("id", prefix)) executeAndLog(req, s"listing ids with prefix $prefix") .flatMap(response => handleImageIdScrollResponse(s"listing ids with prefix $prefix", response)) } def listImageIdsWithUnexpectedFormat()(implicit ec: ExecutionContext, logMarker: LogMarker) = { val req = search(imagesCurrentAlias) .size(scrollPageSize) .fetchSource(false) .scroll(60.seconds) .query(not(regexQuery("id", "[0-9a-f]{40}"))) val message = s"listing ids with unexpected format" executeAndLog(req, message).flatMap(response => handleImageIdScrollResponse(message, response)) } private val refreshMetadataScript = """ | ctx._source.metadata = new HashMap(); | if (ctx._source.originalMetadata != null) { | ctx._source.metadata.putAll(ctx._source.originalMetadata); | } | if (ctx._source.userMetadata != null && ctx._source.userMetadata.metadata != null) { | ctx._source.metadata.putAll(ctx._source.userMetadata.metadata); | } | ctx._source.metadata = ctx._source.metadata.entrySet().stream().filter(x -> x.value != "").collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); """.stripMargin private val refreshUsageRightsScript = """ | if (ctx._source.userMetadata != null && ctx._source.userMetadata.usageRights != null) { | ctx._source.usageRights = new HashMap(); | ctx._source.usageRights.putAll(ctx._source.userMetadata.usageRights); | } else if (ctx._source.originalUsageRights == null){ | ctx._source.usageRights = null; | } else { | ctx._source.usageRights = new HashMap(); | ctx._source.usageRights.putAll(ctx._source.originalUsageRights); | } """.stripMargin private val refreshEditsScript = refreshMetadataScript + refreshUsageRightsScript private def loadPainless(str: String) = str.stripMargin.split('\n').map(_.trim.filter(_ >= ' ')).mkString // remove ctrl chars and leading, trailing whitespace private def loadUpdatingModificationPainless(str: String) = loadPainless(modificationDateFormatting + "\n" + str + "\n" + updateLastModifiedScript) private val modificationDateFormatting = """ | def modificationDate = Date.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(params.lastModified))); """ // Script that updates the "lastModified" property using the "lastModified" parameter private val updateLastModifiedScript = """ | def lastModifiedDate = ctx._source.lastModified != null ? Date.from(Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse(ctx._source.lastModified))) : null; | if (lastModifiedDate == null || modificationDate.after(lastModifiedDate)) { | ctx._source.lastModified = params.lastModified; | } """.stripMargin @nowarn("cat=deprecation") // TODO ScalaObjectMapper is deprecated because unusable in Scala 3 private def asNestedMap(sr: SyndicationRights) = { // TODO not great; there must be a better way to flatten a case class into a Map import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String, Object]](Json.stringify(Json.toJson(sr))) } @nowarn("cat=deprecation") // TODO ScalaObjectMapper is deprecated because unusable in Scala 3 private def asNestedMap(i: JsValue) = { // TODO not great; there must be a better way to flatten a case class into a Map import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String, Object]](Json.stringify(i)) } private def asImageUpdate(image: JsValue): JsValue = { def removeUploadInformation(): Reads[JsObject] = (__ \ "uploadTime").json.prune andThen (__ \ "userMetadata").json.prune andThen (__ \ "exports").json.prune andThen (__ \ "uploadedBy").json.prune andThen (__ \ "collections").json.prune andThen (__ \ "leases").json.prune andThen (__ \ "usages").json.prune image.transform(removeUploadInformation()).get } }