def handleRequest()

in eventconsumer/src/main/scala/com/gu/notifications/events/AthenaMetrics.scala [76:111]


  def handleRequest(reportingWindow: ReportingWindow)
    (implicit athenaAsyncClient: AmazonAthenaAsync, scheduledExecutorService: ScheduledExecutorService, dynamoDBAsyncClient: AmazonDynamoDBAsync): Unit = {
    val athenaOutputLocation = s"${envDependencies.athenaOutputLocation}/${reportingWindow.end.toLocalDate.toString}/${reportingWindow.end.getHour}"
    val athenaDatabase = envDependencies.athenaDatabase

    logger.info(s"Processing the reporting window $reportingWindow")

    val request = s"""
      |SELECT
      | 	notificationid,
      | 	count(*) AS total,
      | 	count_if(platform = 'ios') AS ios,
      | 	count_if(platform = 'android') AS android,
      | 	count_if(platform = 'ios-edition') AS iosEdition,
      | 	count_if(platform = 'android-edition') AS androidEdition
      |FROM
      |	 notification_received_${stage.toLowerCase()}
      |WHERE
      | 	(('${Athena.toQueryDate(reportingWindow.start)}' != '${Athena.toQueryDate(reportingWindow.end)}'
      | 		AND partition_date = '${Athena.toQueryDate(reportingWindow.end)}'
      | 	) OR (
      | 		partition_date = '${Athena.toQueryDate(reportingWindow.start)}'
      | 		AND partition_hour >= ${reportingWindow.start.getHour}
      | 	)) AND (provider != 'comment' OR provider IS NULL)
      |  AND notificationid != 'unknown'
      |  AND notificationid IS NOT NULL
      |GROUP BY
      |	 notificationid""".stripMargin

    val fetchEventsQuery = Query(athenaDatabase, request, athenaOutputLocation)

    val result = addS3PartitionsToAthenaIndex(reportingWindow, athenaDatabase, athenaOutputLocation)
      .flatMap(_ => routeFromQueryToUpdateDynamoDb(fetchEventsQuery, reportingWindow.start))

    Await.result(result, duration.Duration(4, TimeUnit.MINUTES))
  }