in eventconsumer/src/main/scala/com/gu/notifications/events/AthenaMetrics.scala [30:60]
private def addS3PartitionsToAthenaIndex(
reportingWindow: ReportingWindow,
athenaDatabase: String,
athenaOutputLocation: String
)(implicit amazonAthenaAsync: AmazonAthenaAsync, scheduledExecutorService: ScheduledExecutorService
): Future[String] = {
@tailrec
def addPartitionFrom(fromTime: ZonedDateTime, started: List[Future[String]] = List()): List[Future[String]] = {
if (fromTime.isAfter(reportingWindow.end)) {
started
}
else {
val integerHour = fromTime.getHour
val hour = if (integerHour < 10) s"0$integerHour" else integerHour.toString
val date = Athena.toQueryDate(fromTime)
val request = s"""
|ALTER TABLE raw_events_$stage
|ADD IF NOT EXISTS PARTITION (date='$date', hour=$hour)
|LOCATION '${envDependencies.ingestLocation}/date=$date/hour=$hour/'
""".stripMargin
val list = Athena.startQuery(Query(athenaDatabase, request, athenaOutputLocation)) :: started
addPartitionFrom(fromTime.plusHours(1), list)
}
}
if (reportingWindow.reIndex) {
addPartitionFrom(reportingWindow.start).reduce((a, b) => a.flatMap(_ => b))
} else {
Future.successful("")
}
}