in go/rows.go [134:226]
func (r *Rows) fetchNextPage(token *string) error {
var err error
r.ResultOutput, err = r.athena.GetQueryResultsWithContext(r.ctx,
&athena.GetQueryResultsInput{
QueryExecutionId: aws.String(r.queryID),
NextToken: token,
})
if err != nil {
r.tracer.Scope().Counter(DriverName + ".failure.fetchnextpage.getqueryresults").Inc(1)
r.tracer.Log(ErrorLevel, "GetQueryResults failed", zap.String("error", err.Error()))
r.reachedLastPage = true
return err
}
r.pageCount++
// First row of the first page contains header if the query is not DDL.
// These are also available in *athenaAPI.Row.ResultSetMetadata.
// Sometimes Athena go API will return row data without corresponding ColumnInfo. To circumvent this situation,
// we choose to name the column as `column` + 0-index-based number
// One example is:
// input:
// MSCK REPAIR TABLE sampledb.elb_logs
// output:
// _col0
// Partitions not in metastore: elb_logs:2015/01/01 elb_logs:2015/01/02 elb_logs:2015/01/03
// elb_logs:2015/01/04 elb_logs:2015/01/05 elb_logs:2015/01/06 elb_logs:2015/01/07
if r.ResultOutput != nil &&
r.ResultOutput.ResultSet.ResultSetMetadata != nil &&
r.ResultOutput.ResultSet.ResultSetMetadata.ColumnInfo != nil {
rowLen := len(r.ResultOutput.ResultSet.Rows)
colLen := len(r.ResultOutput.ResultSet.ResultSetMetadata.ColumnInfo)
if rowLen > 0 {
rowColLen := len(r.ResultOutput.ResultSet.Rows[0].Data)
if colLen < rowColLen {
for i := 0; i < rowColLen-colLen; i++ {
colName := "_col" + strconv.Itoa(i+colLen)
colType := "string"
colInfo := newColumnInfo(colName, colType)
r.ResultOutput.ResultSet.ResultSetMetadata.ColumnInfo = append(r.ResultOutput.ResultSet.ResultSetMetadata.ColumnInfo,
colInfo)
}
} else if colLen > rowColLen && rowColLen == 1 {
for k := 0; k < rowLen; k++ {
items := strings.Split(*r.ResultOutput.ResultSet.Rows[k].Data[0].VarCharValue, "\t")
if len(items) == colLen {
for i, v := range items {
items[i] = strings.TrimSpace(v)
}
r.ResultOutput.ResultSet.Rows[k] = newRow(colLen, items)
}
}
}
} else if rowLen == 0 && colLen == 1 && r.ResultOutput.UpdateCount != nil {
if *r.ResultOutput.UpdateCount > 0 {
if *r.ResultOutput.ResultSet.ResultSetMetadata.ColumnInfo[0].Name == "rows" {
// For DML's INSERT INTO, DDL's CTAS
updateCount := strconv.FormatInt(*r.ResultOutput.UpdateCount, 10)
rData := athena.Datum{VarCharValue: &updateCount}
aRow := athena.Row{Data: []*athena.Datum{&rData}}
r.ResultOutput.ResultSet.Rows = append(r.ResultOutput.ResultSet.Rows, &aRow)
}
}
}
}
var rowOffset = 0
if r.pageCount == 0 {
rs := r.ResultOutput.ResultSet
ci := r.ResultOutput.ResultSet.ResultSetMetadata.ColumnInfo
i := 0
if len(ci) > 0 && len(rs.Rows) > 0 && len(rs.Rows[0].Data) > 0 && len(rs.Rows[0].Data) == len(ci) {
for ; i < len(ci); i++ {
if rs.Rows[0].Data[i] == nil || rs.Rows[0].Data[i].VarCharValue == nil {
break
}
if *ci[i].Name != *rs.Rows[0].Data[i].VarCharValue {
break
}
}
if i == len(ci) {
rowOffset = 1
}
}
}
// if there is no new row, we should not continue, and this also filters out cases that Rows is nil
if len(r.ResultOutput.ResultSet.Rows) <= rowOffset {
r.reachedLastPage = true
return nil
}
r.ResultOutput.ResultSet.Rows = r.ResultOutput.ResultSet.Rows[rowOffset:]
return nil
}