def saveToEsWithAliasSwap()

in common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala [39:77]


  def saveToEsWithAliasSwap(
      aliasName: String,
      documentType: String
  ): EnrichedAliasActionResponse = {
    val newIndexNameWithTime = IndexNameGenerator.timeBasedIndexName(aliasName, Instant.now())
    val newPersistencePath   = s"$newIndexNameWithTime/$documentType"

    logger.info(
      s"Storing data into $newPersistencePath and swapping alias $aliasName to read from the new index"
    )

    import scala.concurrent.ExecutionContext.Implicits.global
    // Save data
    val futureResponse: Future[AliasActionResponse] = try {
      data
        .toDF()
        .saveToEs(newPersistencePath)

      logger.info(
        s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName"
      )
      moveAliasToNewIndex(aliasName, newIndexNameWithTime).flatMap { response =>
        if (response.isSuccess && response.result.success) {
          logger.info("Alias was updated successfully")
          closeElasticsearchClientConn()
          Future.successful(response.result)
        } else {
          closeElasticsearchClientConn()
          logger.error(s"Alias update failed with response result error ${response.error}")
          logger.error(s"Alias update failed with ES ACK: ${response.result.acknowledged}")
          Future.failed(new Exception(s"Index alias $aliasName update failure ${response.error}"))
        }
      }
    } catch {
      case e: Exception =>
        Future.failed[AliasActionResponse](e)
    }
    EnrichedAliasActionResponse(futureResponse, newPersistencePath)
  }