in plugins/inputs/postgresql_extensible/postgresql_extensible.go [225:298]
func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumulator) error {
var columnVars []interface{}
var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})
for _, column := range p.OrderedColumns {
columnMap[column] = new(interface{})
}
// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[p.OrderedColumns[i]])
}
// deconstruct array of variables and send to Scan
err := row.Scan(columnVars...)
if err != nil {
return err
}
if columnMap["datname"] != nil {
// extract the database name from the column map
dbname.WriteString((*columnMap["datname"]).(string))
} else {
dbname.WriteString("postgres")
}
var tagAddress string
tagAddress, err = p.SanitizedAddress()
if err != nil {
return err
}
// Process the additional tags
tags := map[string]string{}
tags["server"] = tagAddress
tags["db"] = dbname.String()
fields := make(map[string]interface{})
COLUMN:
for col, val := range columnMap {
log.Printf("D! postgresql_extensible: column: %s = %T: %s\n", col, *val, *val)
_, ignore := ignoredColumns[col]
if ignore || *val == nil {
continue
}
for _, tag := range p.AdditionalTags {
if col != tag {
continue
}
switch v := (*val).(type) {
case string:
tags[col] = v
case []byte:
tags[col] = string(v)
case int64, int32, int:
tags[col] = fmt.Sprintf("%d", v)
default:
log.Println("failed to add additional tag", col)
}
continue COLUMN
}
if v, ok := (*val).([]byte); ok {
fields[col] = string(v)
} else {
fields[col] = *val
}
}
acc.AddFields(meas_name, fields, tags)
return nil
}