in plugins/inputs/sysstat/sysstat.go [216:299]
func (s *Sysstat) parse(acc telegraf.Accumulator, option string, ts time.Time) error {
cmd := execCommand(s.Sadf, s.sadfOptions(option)...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("running command '%s' failed: %s", strings.Join(cmd.Args, " "), err)
}
r := bufio.NewReader(stdout)
csv := csv.NewReader(r)
csv.Comma = '\t'
csv.FieldsPerRecord = 6
var measurement string
// groupData to accumulate data when Group=true
type groupData struct {
tags map[string]string
fields map[string]interface{}
}
m := make(map[string]groupData)
for {
record, err := csv.Read()
if err == io.EOF {
break
}
if err != nil {
return err
}
device := record[3]
value, err := strconv.ParseFloat(record[5], 64)
if err != nil {
return err
}
tags := map[string]string{}
if device != "-" {
tags["device"] = device
if addTags, ok := s.DeviceTags[device]; ok {
for _, tag := range addTags {
for k, v := range tag {
tags[k] = v
}
}
}
}
if s.Group {
measurement = s.Options[option]
if _, ok := m[device]; !ok {
m[device] = groupData{
fields: make(map[string]interface{}),
tags: make(map[string]string),
}
}
g, _ := m[device]
if len(g.tags) == 0 {
for k, v := range tags {
g.tags[k] = v
}
}
g.fields[escape(record[4])] = value
} else {
measurement = s.Options[option] + "_" + escape(record[4])
fields := map[string]interface{}{
"value": value,
}
acc.AddFields(measurement, fields, tags, ts)
}
}
if s.Group {
for _, v := range m {
acc.AddFields(measurement, v.fields, v.tags, ts)
}
}
if err := internal.WaitTimeout(cmd, time.Second*5); err != nil {
return fmt.Errorf("command %s failed with %s",
strings.Join(cmd.Args, " "), err)
}
return nil
}