in commoneventconsumer/src/main/scala/com/gu/notifications/athena/Athena.scala [68:83]
def fetchQueryResponse[T](id: String, transform: List[List[String]] => T)(implicit athenaAsync: AmazonAthenaAsync): Future[T] = {
def readAndProcessNext(getQueryResultsResult: GetQueryResultsResult, last: List[List[String]] = Nil): Future[List[List[String]]] = {
val next = last ++ getQueryResultsResult.getResultSet.getRows.asScala.toList.map(row => row.getData.asScala.map(_.getVarCharValue).toList)
Option(getQueryResultsResult.getNextToken) match {
case Some(token) => asyncHandle[GetQueryResultsRequest, GetQueryResultsResult](asyncHandler =>
athenaAsync.getQueryResultsAsync(new GetQueryResultsRequest().withQueryExecutionId(id).withNextToken(token), asyncHandler)).flatMap(readAndProcessNext(_, next))
case None => Future.successful(next)
}
}
asyncHandle[GetQueryResultsRequest, GetQueryResultsResult](asyncHandler => athenaAsync.getQueryResultsAsync(new GetQueryResultsRequest().withQueryExecutionId(id), asyncHandler))
.flatMap(readAndProcessNext(_)).map {
case _ :: tail => tail
case _ => Nil
}.map(transform(_))
}