private def addS3PartitionsToAthenaIndex()

in eventconsumer/src/main/scala/com/gu/notifications/events/AthenaMetrics.scala [30:60]


  private def addS3PartitionsToAthenaIndex(
    reportingWindow: ReportingWindow,
    athenaDatabase: String,
    athenaOutputLocation: String
  )(implicit amazonAthenaAsync: AmazonAthenaAsync, scheduledExecutorService: ScheduledExecutorService
  ): Future[String] = {
    @tailrec
    def addPartitionFrom(fromTime: ZonedDateTime, started: List[Future[String]] = List()): List[Future[String]] = {
      if (fromTime.isAfter(reportingWindow.end)) {
        started
      }
      else {
        val integerHour = fromTime.getHour
        val hour = if (integerHour < 10) s"0$integerHour" else integerHour.toString
        val date = Athena.toQueryDate(fromTime)
        val request = s"""
             |ALTER TABLE raw_events_$stage
             |ADD IF NOT EXISTS PARTITION (date='$date', hour=$hour)
             |LOCATION '${envDependencies.ingestLocation}/date=$date/hour=$hour/'
           """.stripMargin
        val list = Athena.startQuery(Query(athenaDatabase, request, athenaOutputLocation)) :: started
        addPartitionFrom(fromTime.plusHours(1), list)
      }
    }

    if (reportingWindow.reIndex) {
      addPartitionFrom(reportingWindow.start).reduce((a, b) => a.flatMap(_ => b))
    } else {
      Future.successful("")
    }
  }