in scripts/go/analyzeCUR/analyzeCUR.go [202:281]
func sendQuery(svc *athena.Athena, db string, sql string, account string, region string) (AthenaResponse, error) {
var results AthenaResponse
var s athena.StartQueryExecutionInput
s.SetQueryString(sql)
var q athena.QueryExecutionContext
q.SetDatabase(db)
s.SetQueryExecutionContext(&q)
var r athena.ResultConfiguration
r.SetOutputLocation("s3://aws-athena-query-results-" + account + "-" + region + "/")
s.SetResultConfiguration(&r)
result, err := svc.StartQueryExecution(&s)
if err != nil {
return results, errors.New("Error Querying Athena, StartQueryExecution: " + err.Error())
}
var qri athena.GetQueryExecutionInput
qri.SetQueryExecutionId(*result.QueryExecutionId)
var qrop *athena.GetQueryExecutionOutput
duration := time.Duration(2) * time.Second // Pause for 2 seconds
for {
qrop, err = svc.GetQueryExecution(&qri)
if err != nil {
return results, errors.New("Error Querying Athena, GetQueryExecution: " + err.Error())
}
if *qrop.QueryExecution.Status.State != "RUNNING" {
break
}
time.Sleep(duration)
}
if *qrop.QueryExecution.Status.State != "SUCCEEDED" {
return results, errors.New("Error Querying Athena, completion state is NOT SUCCEEDED, state is: " + *qrop.QueryExecution.Status.State)
}
var ip athena.GetQueryResultsInput
ip.SetQueryExecutionId(*result.QueryExecutionId)
// loop through results (paginated call)
var colNames []string
err = svc.GetQueryResultsPages(&ip,
func(page *athena.GetQueryResultsOutput, lastPage bool) bool {
for row := range page.ResultSet.Rows {
if len(colNames) < 1 { // first row contains column names - which we use in any subsequent rows to produce map[columnname]values
for j := range page.ResultSet.Rows[row].Data {
colNames = append(colNames, *page.ResultSet.Rows[row].Data[j].VarCharValue)
}
} else {
result := make(map[string]string)
skip := false
for j := range page.ResultSet.Rows[row].Data {
if j < len(colNames) {
if page.ResultSet.Rows[row].Data[j].VarCharValue == nil {
skip = true
break
}
result[colNames[j]] = *page.ResultSet.Rows[row].Data[j].VarCharValue
}
}
if len(result) > 0 && !skip {
results.Rows = append(results.Rows, result)
}
}
}
if lastPage {
return false // return false to end paginated calls
}
return true // keep going if there are more pages to fetch
})
if err != nil {
return results, errors.New("Error Querying Athena, GetQueryResultsPages: " + err.Error())
}
return results, nil
}