in plugins/inputs/sql/sql.go [129:274]
func (q *Query) parse(ctx context.Context, acc telegraf.Accumulator, rows *dbsql.Rows, t time.Time) (int, error) {
columnNames, err := rows.Columns()
if err != nil {
return 0, err
}
// Prepare the list of datapoints according to the received row
columnData := make([]interface{}, len(columnNames))
columnDataPtr := make([]interface{}, len(columnNames))
for i := range columnData {
columnDataPtr[i] = &columnData[i]
}
rowCount := 0
for rows.Next() {
measurement := q.Measurement
timestamp := t
tags := make(map[string]string)
fields := make(map[string]interface{}, len(columnNames))
// Do the parsing with (hopefully) automatic type conversion
if err := rows.Scan(columnDataPtr...); err != nil {
return 0, err
}
for i, name := range columnNames {
if q.MeasurementColumn != "" && name == q.MeasurementColumn {
var ok bool
if measurement, ok = columnData[i].(string); !ok {
return 0, fmt.Errorf("measurement column type \"%T\" unsupported", columnData[i])
}
}
if q.TimeColumn != "" && name == q.TimeColumn {
var fieldvalue interface{}
var skipParsing bool
switch v := columnData[i].(type) {
case string, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
fieldvalue = v
case []byte:
fieldvalue = string(v)
case time.Time:
timestamp = v
skipParsing = true
case fmt.Stringer:
fieldvalue = v.String()
default:
return 0, fmt.Errorf("time column %q of type \"%T\" unsupported", name, columnData[i])
}
if !skipParsing {
if timestamp, err = internal.ParseTimestamp(q.TimeFormat, fieldvalue, ""); err != nil {
return 0, fmt.Errorf("parsing time failed: %v", err)
}
}
}
if q.tagFilter.Match(name) {
tagvalue, err := internal.ToString(columnData[i])
if err != nil {
return 0, fmt.Errorf("converting tag column %q failed: %v", name, err)
}
if v := strings.TrimSpace(tagvalue); v != "" {
tags[name] = v
}
}
// Explicit type conversions take precedence
if q.fieldFilterFloat.Match(name) {
v, err := internal.ToFloat64(columnData[i])
if err != nil {
return 0, fmt.Errorf("converting field column %q to float failed: %v", name, err)
}
fields[name] = v
continue
}
if q.fieldFilterInt.Match(name) {
v, err := internal.ToInt64(columnData[i])
if err != nil {
return 0, fmt.Errorf("converting field column %q to int failed: %v", name, err)
}
fields[name] = v
continue
}
if q.fieldFilterUint.Match(name) {
v, err := internal.ToUint64(columnData[i])
if err != nil {
return 0, fmt.Errorf("converting field column %q to uint failed: %v", name, err)
}
fields[name] = v
continue
}
if q.fieldFilterBool.Match(name) {
v, err := internal.ToBool(columnData[i])
if err != nil {
return 0, fmt.Errorf("converting field column %q to bool failed: %v", name, err)
}
fields[name] = v
continue
}
if q.fieldFilterString.Match(name) {
v, err := internal.ToString(columnData[i])
if err != nil {
return 0, fmt.Errorf("converting field column %q to string failed: %v", name, err)
}
fields[name] = v
continue
}
// Try automatic conversion for all remaining fields
if q.fieldFilter.Match(name) {
var fieldvalue interface{}
switch v := columnData[i].(type) {
case string, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool:
fieldvalue = v
case []byte:
fieldvalue = string(v)
case time.Time:
fieldvalue = v.UnixNano()
case nil:
fieldvalue = nil
case fmt.Stringer:
fieldvalue = v.String()
default:
return 0, fmt.Errorf("field column %q of type \"%T\" unsupported", name, columnData[i])
}
if fieldvalue != nil {
fields[name] = fieldvalue
}
}
}
acc.AddFields(measurement, fields, tags, timestamp)
rowCount++
}
if err := rows.Err(); err != nil {
return rowCount, err
}
return rowCount, nil
}