in plugins/inputs/statsd/statsd.go [609:749]
func (s *Statsd) parseStatsdLine(line string) error {
lineTags := make(map[string]string)
if s.DataDogExtensions {
recombinedSegments := make([]string, 0)
// datadog tags look like this:
// users.online:1|c|@0.5|#country:china,environment:production
// users.online:1|c|#sometagwithnovalue
// we will split on the pipe and remove any elements that are datadog
// tags, parse them, and rebuild the line sans the datadog tags
pipesplit := strings.Split(line, "|")
for _, segment := range pipesplit {
if len(segment) > 0 && segment[0] == '#' {
// we have ourselves a tag; they are comma separated
parseDataDogTags(lineTags, segment[1:])
} else {
recombinedSegments = append(recombinedSegments, segment)
}
}
line = strings.Join(recombinedSegments, "|")
}
// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
s.Log.Errorf("Splitting ':', unable to parse metric: %s", line)
return errParsing
}
// Extract bucket name from individual metric bits
bucketName, bits := bits[0], bits[1:]
// Add a metric for each bit available
for _, bit := range bits {
m := metric{}
m.bucket = bucketName
// Validate splitting the bit on "|"
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 {
s.Log.Errorf("Splitting '|', unable to parse metric: %s", line)
return errParsing
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
if err != nil {
s.Log.Errorf("Parsing sample rate: %s", err.Error())
} else {
// sample rate successfully parsed
m.samplerate = samplerate
}
} else {
s.Log.Debugf("Sample rate must be in format like: "+
"@0.1, @0.5, etc. Ignoring sample rate for line: %s", line)
}
}
// Validate metric type
switch pipesplit[1] {
case "g", "c", "s", "ms", "h", "d":
m.mtype = pipesplit[1]
default:
s.Log.Errorf("Metric type %q unsupported", pipesplit[1])
return errParsing
}
// Parse the value
if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") {
if m.mtype != "g" && m.mtype != "c" {
s.Log.Errorf("+- values are only supported for gauges & counters, unable to parse metric: %s", line)
return errParsing
}
m.additive = true
}
switch m.mtype {
case "g", "ms", "h", "d":
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
s.Log.Errorf("Parsing value to float64, unable to parse metric: %s", line)
return errParsing
}
m.floatvalue = v
case "c":
var v int64
v, err := strconv.ParseInt(pipesplit[0], 10, 64)
if err != nil {
v2, err2 := strconv.ParseFloat(pipesplit[0], 64)
if err2 != nil {
s.Log.Errorf("Parsing value to int64, unable to parse metric: %s", line)
return errParsing
}
v = int64(v2)
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)
}
m.intvalue = v
case "s":
m.strvalue = pipesplit[0]
}
// Parse the name & tags from bucket
m.name, m.field, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
case "g":
m.tags["metric_type"] = "gauge"
case "s":
m.tags["metric_type"] = "set"
case "ms":
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
case "d":
m.tags["metric_type"] = "distribution"
}
if len(lineTags) > 0 {
for k, v := range lineTags {
m.tags[k] = v
}
}
// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {
tg = append(tg, k+"="+v)
}
sort.Strings(tg)
tg = append(tg, m.name)
m.hash = strings.Join(tg, "")
s.aggregate(m)
}
return nil
}