in common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala [39:77]
def saveToEsWithAliasSwap(
aliasName: String,
documentType: String
): EnrichedAliasActionResponse = {
val newIndexNameWithTime = IndexNameGenerator.timeBasedIndexName(aliasName, Instant.now())
val newPersistencePath = s"$newIndexNameWithTime/$documentType"
logger.info(
s"Storing data into $newPersistencePath and swapping alias $aliasName to read from the new index"
)
import scala.concurrent.ExecutionContext.Implicits.global
// Save data
val futureResponse: Future[AliasActionResponse] = try {
data
.toDF()
.saveToEs(newPersistencePath)
logger.info(
s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName"
)
moveAliasToNewIndex(aliasName, newIndexNameWithTime).flatMap { response =>
if (response.isSuccess && response.result.success) {
logger.info("Alias was updated successfully")
closeElasticsearchClientConn()
Future.successful(response.result)
} else {
closeElasticsearchClientConn()
logger.error(s"Alias update failed with response result error ${response.error}")
logger.error(s"Alias update failed with ES ACK: ${response.result.acknowledged}")
Future.failed(new Exception(s"Index alias $aliasName update failure ${response.error}"))
}
}
} catch {
case e: Exception =>
Future.failed[AliasActionResponse](e)
}
EnrichedAliasActionResponse(futureResponse, newPersistencePath)
}