def run()

in scripts/src/main/scala/com/gu/mediaservice/scripts/EsScript.scala [23:188]


  def run(esUrl: String, args: List[String]) = {

    object IndexClient extends EsClient(esUrl) {
      val srcIndexVersionCheck = """images_(\d+)""".r
      val srcIndexVersion = this.currentIndex match {
        case srcIndexVersionCheck(version) => version.toInt
        case _ => 1
      }
      val nextIndex = s"${imagesIndexPrefix}_${srcIndexVersion+1}"
    }

    def raiseError(msg: String) = {
      System.err.println(s"Reindex error on: $esUrl : $msg ")
      System.err.println("Exiting...")
      IndexClient.client.close()
      System.exit(1)
    }

    def validateCurrentState(esClient: ElasticSearchClient, from: Option[DateTime]) = {
      if(from.exists(_.isAfter(DateTime.now())))
        raiseError("DateTime parameter 'from' must be earlier than the current time" )
    }

    def getArg(argKey: String): Option[String] = {
      args.find(_ contains s"${argKey}=")
        .map(_ replaceFirst(s"${argKey}=", ""))
    }

    val scrollTime = new FiniteDuration(5, TimeUnit.MINUTES)
    val scrollSize = 500
    val currentIndex = IndexClient.currentIndex
    val newIndex = getArg("NEW_INDEX") match {
      case Some(arg) => arg
      case None => IndexClient.nextIndex
    }
    val from = getArg("FROM_TIME") match {
      case Some(arg) => Some(DateTime.parse(arg))
      case None => None
    }
    validateCurrentState(IndexClient, from)
    Await.result(reindex(from, IndexClient), Duration.Inf)
    println(s"Pointing $esImagesReadAlias to new index: $newIndex")
    IndexClient.changeAliasTo(newIndex, currentIndex, esImagesReadAlias)
    println(s"Finished reindexing from $currentIndex to $newIndex")
    IndexClient.client.close()

    def reindex(from: Option[DateTime], esClient: ElasticSearchClient) : Future[SearchResponse] = {

      def _scroll(scroll: SearchResponse, done: Long = 0): Future[SearchResponse] = {
        val client = esClient.client
        val currentBatch = done + scrollSize
        System.out.println(scrollPercentage(scroll, currentBatch, done))

        def bulkFromHits(hits: Array[SearchHit]): BulkResponse = {
          val bulkRequests: Array[IndexRequest] = hits.map { hit =>
            indexInto(newIndex)
              .withId(hit.id)
              .source(hit.sourceAsString)
          }

          val bulkResponse = IndexClient.client.execute({
            bulk(bulkRequests)
          }).await

          bulkResponse.status match {
            case 200 => bulkResponse.result
            case _ => {
              IndexClient.client.close()
              throw new Exception("Failed performing bulk index")
            }
          }
        }

        def scrollPercentage(scroll: SearchResponse, currentBatch: Long, done: Long): String = {
          val total = scroll.hits.total.value
          // roughly accurate as we're using done, which is relative to scrollSize, rather than the actual number of docs in the new index
          val percentage = (Math.min(done,total).toFloat / total) * 100
          s"Reindexing ${Math.min(currentBatch,total)} of $total ($percentage%)"
        }

        def performScroll(scrollId: String, scrollTime: FiniteDuration): SearchResponse = {
          val scrollResponse = IndexClient.client.execute({
            searchScroll(scrollId)
              .keepAlive(scrollTime)
          }).await

          scrollResponse.status match {
            case 200 => scrollResponse.result
            case _ => {
              IndexClient.client.close()
              throw new Exception("Failed performing bulk index")
            }
          }
        }

        def analyseBulkResponse(bulkResponse: BulkResponse) = {
          val successes = bulkResponse.items.filter(item => item.status == 201).map(item => item.id)
          val failures = bulkResponse.items.filterNot(item => item.status == 201).map(item => item.id)
          println(s"...added ${successes.length}/${bulkResponse.items.length} items in ${bulkResponse.took} ms (${failures.length} failures)")
          if (failures.nonEmpty) println(s"......failure IDs: $failures")
        }

        val hits = scroll.hits.hits
        if(hits.nonEmpty) {
          val bulkResponse = bulkFromHits(hits)
          analyseBulkResponse(bulkResponse)
          val scrollResponse = performScroll(scroll.scrollId.get, scrollTime)
          _scroll(scrollResponse, currentBatch)
        } else {
          println("No more results found")
          Future.successful[SearchResponse](scroll)
        }
      }

      def query(from: Option[DateTime]) : SearchResponse = {
        val queryType = from.map(time =>
          rangeQuery("lastModified").gte(from.get.getMillis).lte(DateTime.now.getMillis)
        ).getOrElse(
          matchAllQuery()
        )

        val queryResponse = IndexClient.client.execute({
          search(currentIndex)
//            .types(Mappings.dummyType)
            .scroll(scrollTime)
            .size(scrollSize)
            .query(queryType)
        }).await

        queryResponse.status match {
          case 200 => queryResponse.result
          case _ => {
            IndexClient.client.close()
            throw new Exception("Failed performing search query")
          }
        }
      }

      // if no 'from' time parameter is passed, create a new index
      if(from.isEmpty) {
        IndexClient.createImageIndex(newIndex)
      } else {
        println(s"Reindexing documents modified since: ${from.toString}")
      }

      val startTime = DateTime.now()
      println(s"Reindex started at: $startTime")
      println(s"Reindexing from: ${IndexClient.currentIndex} to: $newIndex")
      val scrollResponse = query(from)
      _scroll(scrollResponse) flatMap  { case (response: SearchResponse) =>
        println(s"Pointing ${IndexClient.imagesCurrentAlias} to new index: $newIndex")
        IndexClient.changeAliasTo(newIndex, currentIndex)

        val changedDocuments: Long = query(Option(startTime)).hits.total.value
        println(s"$changedDocuments changed documents since start")

        if(changedDocuments > 0) {
          println(s"Reindexing changes since start time: $startTime")
          val recurseResponse = reindex(Option(startTime), esClient)
          recurseResponse
        } else {
          Future.successful(response)
        }
      }
    }
  }