in eventconsumer/src/main/scala/com/gu/notifications/events/AthenaMetrics.scala [76:111]
def handleRequest(reportingWindow: ReportingWindow)
(implicit athenaAsyncClient: AmazonAthenaAsync, scheduledExecutorService: ScheduledExecutorService, dynamoDBAsyncClient: AmazonDynamoDBAsync): Unit = {
val athenaOutputLocation = s"${envDependencies.athenaOutputLocation}/${reportingWindow.end.toLocalDate.toString}/${reportingWindow.end.getHour}"
val athenaDatabase = envDependencies.athenaDatabase
logger.info(s"Processing the reporting window $reportingWindow")
val request = s"""
|SELECT
| notificationid,
| count(*) AS total,
| count_if(platform = 'ios') AS ios,
| count_if(platform = 'android') AS android,
| count_if(platform = 'ios-edition') AS iosEdition,
| count_if(platform = 'android-edition') AS androidEdition
|FROM
| notification_received_${stage.toLowerCase()}
|WHERE
| (('${Athena.toQueryDate(reportingWindow.start)}' != '${Athena.toQueryDate(reportingWindow.end)}'
| AND partition_date = '${Athena.toQueryDate(reportingWindow.end)}'
| ) OR (
| partition_date = '${Athena.toQueryDate(reportingWindow.start)}'
| AND partition_hour >= ${reportingWindow.start.getHour}
| )) AND (provider != 'comment' OR provider IS NULL)
| AND notificationid != 'unknown'
| AND notificationid IS NOT NULL
|GROUP BY
| notificationid""".stripMargin
val fetchEventsQuery = Query(athenaDatabase, request, athenaOutputLocation)
val result = addS3PartitionsToAthenaIndex(reportingWindow, athenaDatabase, athenaOutputLocation)
.flatMap(_ => routeFromQueryToUpdateDynamoDb(fetchEventsQuery, reportingWindow.start))
Await.result(result, duration.Duration(4, TimeUnit.MINUTES))
}