in thrall/app/lib/elasticsearch/ElasticSearch.scala [81:150]
def directInsert(image: Image, indexName: String)(implicit ex: ExecutionContext, logMarker: LogMarker): Future[ElasticSearchInsertResponse] =
executeAndLog(
indexInto(indexName).id(image.id).source(Json.stringify(Json.toJson(image))),
s"ES6 indexing image ${image.id} into index '$indexName'"
).map(indexResponse =>
ElasticSearchInsertResponse(indexResponse.result.index)
)
def migrationAwareIndexImage(id: String, image: Image, lastModified: DateTime)
(implicit ex: ExecutionContext, logMarker: LogMarker): Future[ElasticSearchUpdateResponse] = {
// On insert, we know we will not have a lastModified to consider, so we always take the one we get
val insertImage = image.copy(lastModified = Some(lastModified))
val insertImageAsJson = Json.toJson(insertImage)
def runUpsertIntoIndex(indexAlias: String, maybeEsInfo: Option[JsObject]) = {
val esInfo = maybeEsInfo.getOrElse(JsObject.empty)
// On update, we do not want to take the one we have been given unless it is newer - see updateLastModifiedScript script
val updateImage = image.copy(lastModified = None)
val upsertImageAsJson = Json.toJson(updateImage)
val painlessSource =
// If there are old identifiers, then merge any new identifiers into old and use the merged results as the new identifiers
"""
| if (ctx._source.identifiers != null) {
| ctx._source.identifiers.putAll(params.update_doc.identifiers);
| params.update_doc.identifiers = ctx._source.identifiers
| }
|
| ctx._source.putAll(params.update_doc);
|
| if (ctx._source.metadata != null && ctx._source.metadata.credit != null) {
| ctx._source.suggestMetadataCredit = [ "input": [ ctx._source.metadata.credit ] ]
| }
"""
val scriptSource = loadUpdatingModificationPainless(s"""
|$painlessSource
|$refreshEditsScript
| """)
val script: Script = prepareScript(scriptSource, lastModified,
("update_doc", asNestedMap(asImageUpdate(upsertImageAsJson.as[JsObject] ++ esInfo)))
)
val indexRequest = updateById(indexAlias, id).
upsert(Json.stringify(insertImageAsJson.as[JsObject] ++ esInfo)).
script(script)
executeAndLog(indexRequest, s"ES6 indexing image $id into index aliased by '$indexAlias'")
}
val runUpsertIntoMigrationIndexAndReturnEsInfoForCurrentIndex: Future[JsObject] = migrationStatus match {
case running: Running =>
runUpsertIntoIndex(imagesMigrationAlias, maybeEsInfo = None)
.map(_ => EsInfo(Some(MigrationInfo(migratedTo = Some(running.migrationIndexName)))))
.recover { case error => EsInfo(Some(MigrationInfo(failures = Some(Map(
running.migrationIndexName -> error.getMessage
))))) }
.map(esInfo => Json.obj("esInfo" -> Json.toJson(esInfo)))
case _ => Future.successful(JsObject.empty)
}
for {
esInfo <- runUpsertIntoMigrationIndexAndReturnEsInfoForCurrentIndex
_ <- runUpsertIntoIndex(imagesCurrentAlias, maybeEsInfo = Some(esInfo))
} yield ElasticSearchUpdateResponse()
}