def fetchQueryResponse[T]()

in commoneventconsumer/src/main/scala/com/gu/notifications/athena/Athena.scala [68:83]


  def fetchQueryResponse[T](id: String, transform: List[List[String]] => T)(implicit athenaAsync: AmazonAthenaAsync): Future[T] = {
    def readAndProcessNext(getQueryResultsResult: GetQueryResultsResult, last: List[List[String]] = Nil): Future[List[List[String]]] = {
      val next = last ++ getQueryResultsResult.getResultSet.getRows.asScala.toList.map(row => row.getData.asScala.map(_.getVarCharValue).toList)
      Option(getQueryResultsResult.getNextToken) match {
        case Some(token) => asyncHandle[GetQueryResultsRequest, GetQueryResultsResult](asyncHandler =>
          athenaAsync.getQueryResultsAsync(new GetQueryResultsRequest().withQueryExecutionId(id).withNextToken(token), asyncHandler)).flatMap(readAndProcessNext(_, next))
        case None => Future.successful(next)
      }
    }

    asyncHandle[GetQueryResultsRequest, GetQueryResultsResult](asyncHandler => athenaAsync.getQueryResultsAsync(new GetQueryResultsRequest().withQueryExecutionId(id), asyncHandler))
      .flatMap(readAndProcessNext(_)).map {
      case _ :: tail => tail
      case _ => Nil
    }.map(transform(_))
  }