func()

in plugins/inputs/sql/sql.go [129:274]


func (q *Query) parse(ctx context.Context, acc telegraf.Accumulator, rows *dbsql.Rows, t time.Time) (int, error) {
	columnNames, err := rows.Columns()
	if err != nil {
		return 0, err
	}

	// Prepare the list of datapoints according to the received row
	columnData := make([]interface{}, len(columnNames))
	columnDataPtr := make([]interface{}, len(columnNames))

	for i := range columnData {
		columnDataPtr[i] = &columnData[i]
	}

	rowCount := 0
	for rows.Next() {
		measurement := q.Measurement
		timestamp := t
		tags := make(map[string]string)
		fields := make(map[string]interface{}, len(columnNames))

		// Do the parsing with (hopefully) automatic type conversion
		if err := rows.Scan(columnDataPtr...); err != nil {
			return 0, err
		}

		for i, name := range columnNames {
			if q.MeasurementColumn != "" && name == q.MeasurementColumn {
				var ok bool
				if measurement, ok = columnData[i].(string); !ok {
					return 0, fmt.Errorf("measurement column type \"%T\" unsupported", columnData[i])
				}
			}

			if q.TimeColumn != "" && name == q.TimeColumn {
				var fieldvalue interface{}
				var skipParsing bool

				switch v := columnData[i].(type) {
				case string, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
					fieldvalue = v
				case []byte:
					fieldvalue = string(v)
				case time.Time:
					timestamp = v
					skipParsing = true
				case fmt.Stringer:
					fieldvalue = v.String()
				default:
					return 0, fmt.Errorf("time column %q of type \"%T\" unsupported", name, columnData[i])
				}
				if !skipParsing {
					if timestamp, err = internal.ParseTimestamp(q.TimeFormat, fieldvalue, ""); err != nil {
						return 0, fmt.Errorf("parsing time failed: %v", err)
					}
				}
			}

			if q.tagFilter.Match(name) {
				tagvalue, err := internal.ToString(columnData[i])
				if err != nil {
					return 0, fmt.Errorf("converting tag column %q failed: %v", name, err)
				}
				if v := strings.TrimSpace(tagvalue); v != "" {
					tags[name] = v
				}
			}

			// Explicit type conversions take precedence
			if q.fieldFilterFloat.Match(name) {
				v, err := internal.ToFloat64(columnData[i])
				if err != nil {
					return 0, fmt.Errorf("converting field column %q to float failed: %v", name, err)
				}
				fields[name] = v
				continue
			}

			if q.fieldFilterInt.Match(name) {
				v, err := internal.ToInt64(columnData[i])
				if err != nil {
					return 0, fmt.Errorf("converting field column %q to int failed: %v", name, err)
				}
				fields[name] = v
				continue
			}

			if q.fieldFilterUint.Match(name) {
				v, err := internal.ToUint64(columnData[i])
				if err != nil {
					return 0, fmt.Errorf("converting field column %q to uint failed: %v", name, err)
				}
				fields[name] = v
				continue
			}

			if q.fieldFilterBool.Match(name) {
				v, err := internal.ToBool(columnData[i])
				if err != nil {
					return 0, fmt.Errorf("converting field column %q to bool failed: %v", name, err)
				}
				fields[name] = v
				continue
			}

			if q.fieldFilterString.Match(name) {
				v, err := internal.ToString(columnData[i])
				if err != nil {
					return 0, fmt.Errorf("converting field column %q to string failed: %v", name, err)
				}
				fields[name] = v
				continue
			}

			// Try automatic conversion for all remaining fields
			if q.fieldFilter.Match(name) {
				var fieldvalue interface{}
				switch v := columnData[i].(type) {
				case string, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool:
					fieldvalue = v
				case []byte:
					fieldvalue = string(v)
				case time.Time:
					fieldvalue = v.UnixNano()
				case nil:
					fieldvalue = nil
				case fmt.Stringer:
					fieldvalue = v.String()
				default:
					return 0, fmt.Errorf("field column %q of type \"%T\" unsupported", name, columnData[i])
				}
				if fieldvalue != nil {
					fields[name] = fieldvalue
				}
			}
		}
		acc.AddFields(measurement, fields, tags, timestamp)
		rowCount++
	}

	if err := rows.Err(); err != nil {
		return rowCount, err
	}

	return rowCount, nil
}