plugins/parsers/influx/influx_upstream/parser.go (230 lines of code) (raw):
package influx_upstream
import (
"errors"
"fmt"
"io"
"strings"
"time"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
const (
maxErrorBufferSize = 1024
)
var (
ErrNoMetric = errors.New("no metric in line")
ErrEOF = errors.New("EOF")
)
type TimeFunc func() time.Time
// nthIndexAny finds the nth index of some unicode code point in a string or returns -1
func nthIndexAny(s, chars string, n int) int {
offset := 0
for found := 1; found <= n; found++ {
i := strings.IndexAny(s[offset:], chars)
if i < 0 {
break
}
offset += i
if found == n {
return offset
}
offset += len(chars)
}
return -1
}
// ParseError indicates a error in the parsing of the text.
type ParseError struct {
*lineprotocol.DecodeError
buf string
}
func (e *ParseError) Error() string {
// When an error occurs within the stream decoder, we do not have access
// to the internal buffer, so we cannot display the contents of the invalid
// metric
if e.buf == "" {
return fmt.Sprintf("metric parse error: %s at %d:%d", e.Err, e.Line, e.Column)
}
lineStart := nthIndexAny(e.buf, "\n", int(e.Line-1)) + 1
buffer := e.buf[lineStart:]
eol := strings.IndexAny(buffer, "\n")
if eol >= 0 {
buffer = strings.TrimSuffix(buffer[:eol], "\r")
}
if len(buffer) > maxErrorBufferSize {
startEllipsis := true
offset := e.Column - 1 - lineStart
if offset > len(buffer) || offset < 0 {
offset = len(buffer)
}
start := offset - maxErrorBufferSize
if start < 0 {
startEllipsis = false
start = 0
}
// if we trimmed it the column won't line up. it'll always be the last character,
// because the parser doesn't continue past it, but point it out anyway so
// it's obvious where the issue is.
buffer = buffer[start:offset] + "<-- here"
if startEllipsis {
buffer = "..." + buffer
}
}
return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.Err, e.Line, e.Column, buffer)
}
// convertToParseError attempts to convert a lineprotocol.DecodeError to a ParseError
func convertToParseError(input []byte, rawErr error) error {
err, ok := rawErr.(*lineprotocol.DecodeError)
if !ok {
return rawErr
}
return &ParseError{
DecodeError: err,
buf: string(input),
}
}
// Parser is an InfluxDB Line Protocol parser that implements the
// parsers.Parser interface.
type Parser struct {
DefaultTags map[string]string
defaultTime TimeFunc
precision lineprotocol.Precision
allowPartial bool
}
// NewParser returns a Parser than accepts line protocol
func NewParser() *Parser {
return &Parser{
defaultTime: time.Now,
precision: lineprotocol.Nanosecond,
}
}
// NewSeriesParser returns a Parser than accepts a measurement and tagset
func NewSeriesParser() *Parser {
return &Parser{
defaultTime: time.Now,
precision: lineprotocol.Nanosecond,
allowPartial: true,
}
}
func (p *Parser) SetTimeFunc(f TimeFunc) {
p.defaultTime = f
}
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)
decoder := lineprotocol.NewDecoderWithBytes(input)
for decoder.Next() {
m, err := nextMetric(decoder, p.precision, p.defaultTime, p.allowPartial)
if err != nil {
return nil, convertToParseError(input, err)
}
metrics = append(metrics, m)
}
p.applyDefaultTags(metrics)
return metrics, nil
}
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, ErrNoMetric
}
return metrics[0], nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *Parser) SetTimePrecision(u time.Duration) {
switch u {
case time.Nanosecond:
p.precision = lineprotocol.Nanosecond
case time.Microsecond:
p.precision = lineprotocol.Microsecond
case time.Millisecond:
p.precision = lineprotocol.Millisecond
case time.Second:
p.precision = lineprotocol.Second
}
}
func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
if len(p.DefaultTags) == 0 {
return
}
for _, m := range metrics {
p.applyDefaultTagsSingle(m)
}
}
func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) {
for k, v := range p.DefaultTags {
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
// StreamParser is an InfluxDB Line Protocol parser. It is not safe for
// concurrent use in multiple goroutines.
type StreamParser struct {
decoder *lineprotocol.Decoder
defaultTime TimeFunc
precision lineprotocol.Precision
lastError error
}
func NewStreamParser(r io.Reader) *StreamParser {
return &StreamParser{
decoder: lineprotocol.NewDecoder(r),
defaultTime: time.Now,
precision: lineprotocol.Nanosecond,
}
}
// SetTimeFunc changes the function used to determine the time of metrics
// without a timestamp. The default TimeFunc is time.Now. Useful mostly for
// testing, or perhaps if you want all metrics to have the same timestamp.
func (sp *StreamParser) SetTimeFunc(f TimeFunc) {
sp.defaultTime = f
}
func (sp *StreamParser) SetTimePrecision(u time.Duration) {
switch u {
case time.Nanosecond:
sp.precision = lineprotocol.Nanosecond
case time.Microsecond:
sp.precision = lineprotocol.Microsecond
case time.Millisecond:
sp.precision = lineprotocol.Millisecond
case time.Second:
sp.precision = lineprotocol.Second
}
}
// Next parses the next item from the stream. You can repeat calls to this
// function if it returns ParseError to get the next metric or error.
func (sp *StreamParser) Next() (telegraf.Metric, error) {
if !sp.decoder.Next() {
if err := sp.decoder.Err(); err != nil && err != sp.lastError {
sp.lastError = err
return nil, err
}
return nil, ErrEOF
}
m, err := nextMetric(sp.decoder, sp.precision, sp.defaultTime, false)
if err != nil {
return nil, convertToParseError([]byte{}, err)
}
return m, nil
}
func nextMetric(decoder *lineprotocol.Decoder, precision lineprotocol.Precision, defaultTime TimeFunc, allowPartial bool) (telegraf.Metric, error) {
measurement, err := decoder.Measurement()
if err != nil {
return nil, err
}
m := metric.New(string(measurement), nil, nil, time.Time{})
for {
key, value, err := decoder.NextTag()
if err != nil {
// Allow empty tags for series parser
if strings.Contains(err.Error(), "empty tag name") && allowPartial {
break
}
return nil, err
} else if key == nil {
break
}
m.AddTag(string(key), string(value))
}
for {
key, value, err := decoder.NextField()
if err != nil {
// Allow empty fields for series parser
if strings.Contains(err.Error(), "expected field key") && allowPartial {
break
}
return nil, err
} else if key == nil {
break
}
m.AddField(string(key), value.Interface())
}
t, err := decoder.Time(precision, defaultTime())
if err != nil && !allowPartial {
return nil, err
}
m.SetTime(t)
return m, nil
}