in scripts/src/main/scala/com/gu/mediaservice/scripts/EsScript.scala [23:188]
def run(esUrl: String, args: List[String]) = {
object IndexClient extends EsClient(esUrl) {
val srcIndexVersionCheck = """images_(\d+)""".r
val srcIndexVersion = this.currentIndex match {
case srcIndexVersionCheck(version) => version.toInt
case _ => 1
}
val nextIndex = s"${imagesIndexPrefix}_${srcIndexVersion+1}"
}
def raiseError(msg: String) = {
System.err.println(s"Reindex error on: $esUrl : $msg ")
System.err.println("Exiting...")
IndexClient.client.close()
System.exit(1)
}
def validateCurrentState(esClient: ElasticSearchClient, from: Option[DateTime]) = {
if(from.exists(_.isAfter(DateTime.now())))
raiseError("DateTime parameter 'from' must be earlier than the current time" )
}
def getArg(argKey: String): Option[String] = {
args.find(_ contains s"${argKey}=")
.map(_ replaceFirst(s"${argKey}=", ""))
}
val scrollTime = new FiniteDuration(5, TimeUnit.MINUTES)
val scrollSize = 500
val currentIndex = IndexClient.currentIndex
val newIndex = getArg("NEW_INDEX") match {
case Some(arg) => arg
case None => IndexClient.nextIndex
}
val from = getArg("FROM_TIME") match {
case Some(arg) => Some(DateTime.parse(arg))
case None => None
}
validateCurrentState(IndexClient, from)
Await.result(reindex(from, IndexClient), Duration.Inf)
println(s"Pointing $esImagesReadAlias to new index: $newIndex")
IndexClient.changeAliasTo(newIndex, currentIndex, esImagesReadAlias)
println(s"Finished reindexing from $currentIndex to $newIndex")
IndexClient.client.close()
def reindex(from: Option[DateTime], esClient: ElasticSearchClient) : Future[SearchResponse] = {
def _scroll(scroll: SearchResponse, done: Long = 0): Future[SearchResponse] = {
val client = esClient.client
val currentBatch = done + scrollSize
System.out.println(scrollPercentage(scroll, currentBatch, done))
def bulkFromHits(hits: Array[SearchHit]): BulkResponse = {
val bulkRequests: Array[IndexRequest] = hits.map { hit =>
indexInto(newIndex)
.withId(hit.id)
.source(hit.sourceAsString)
}
val bulkResponse = IndexClient.client.execute({
bulk(bulkRequests)
}).await
bulkResponse.status match {
case 200 => bulkResponse.result
case _ => {
IndexClient.client.close()
throw new Exception("Failed performing bulk index")
}
}
}
def scrollPercentage(scroll: SearchResponse, currentBatch: Long, done: Long): String = {
val total = scroll.hits.total.value
// roughly accurate as we're using done, which is relative to scrollSize, rather than the actual number of docs in the new index
val percentage = (Math.min(done,total).toFloat / total) * 100
s"Reindexing ${Math.min(currentBatch,total)} of $total ($percentage%)"
}
def performScroll(scrollId: String, scrollTime: FiniteDuration): SearchResponse = {
val scrollResponse = IndexClient.client.execute({
searchScroll(scrollId)
.keepAlive(scrollTime)
}).await
scrollResponse.status match {
case 200 => scrollResponse.result
case _ => {
IndexClient.client.close()
throw new Exception("Failed performing bulk index")
}
}
}
def analyseBulkResponse(bulkResponse: BulkResponse) = {
val successes = bulkResponse.items.filter(item => item.status == 201).map(item => item.id)
val failures = bulkResponse.items.filterNot(item => item.status == 201).map(item => item.id)
println(s"...added ${successes.length}/${bulkResponse.items.length} items in ${bulkResponse.took} ms (${failures.length} failures)")
if (failures.nonEmpty) println(s"......failure IDs: $failures")
}
val hits = scroll.hits.hits
if(hits.nonEmpty) {
val bulkResponse = bulkFromHits(hits)
analyseBulkResponse(bulkResponse)
val scrollResponse = performScroll(scroll.scrollId.get, scrollTime)
_scroll(scrollResponse, currentBatch)
} else {
println("No more results found")
Future.successful[SearchResponse](scroll)
}
}
def query(from: Option[DateTime]) : SearchResponse = {
val queryType = from.map(time =>
rangeQuery("lastModified").gte(from.get.getMillis).lte(DateTime.now.getMillis)
).getOrElse(
matchAllQuery()
)
val queryResponse = IndexClient.client.execute({
search(currentIndex)
// .types(Mappings.dummyType)
.scroll(scrollTime)
.size(scrollSize)
.query(queryType)
}).await
queryResponse.status match {
case 200 => queryResponse.result
case _ => {
IndexClient.client.close()
throw new Exception("Failed performing search query")
}
}
}
// if no 'from' time parameter is passed, create a new index
if(from.isEmpty) {
IndexClient.createImageIndex(newIndex)
} else {
println(s"Reindexing documents modified since: ${from.toString}")
}
val startTime = DateTime.now()
println(s"Reindex started at: $startTime")
println(s"Reindexing from: ${IndexClient.currentIndex} to: $newIndex")
val scrollResponse = query(from)
_scroll(scrollResponse) flatMap { case (response: SearchResponse) =>
println(s"Pointing ${IndexClient.imagesCurrentAlias} to new index: $newIndex")
IndexClient.changeAliasTo(newIndex, currentIndex)
val changedDocuments: Long = query(Option(startTime)).hits.total.value
println(s"$changedDocuments changed documents since start")
if(changedDocuments > 0) {
println(s"Reindexing changes since start time: $startTime")
val recurseResponse = reindex(Option(startTime), esClient)
recurseResponse
} else {
Future.successful(response)
}
}
}
}