plugins/parsers/registry.go (327 lines of code) (raw):
package parsers
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/json_v2"
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
"github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
"github.com/influxdata/telegraf/plugins/parsers/xpath"
)
// Creator is the function to create a new parser
type Creator func(defaultMetricName string) telegraf.Parser
// Parsers contains the registry of all known parsers (following the new style)
var Parsers = map[string]Creator{}
// Add adds a parser to the registry. Usually this function is called in the plugin's init function
func Add(name string, creator Creator) {
Parsers[name] = creator
}
type ParserFunc func() (Parser, error)
// ParserInput is an interface for input plugins that are able to parse
// arbitrary data formats.
type ParserInput interface {
// SetParser sets the parser function for the interface
SetParser(parser Parser)
}
// ParserFuncInput is an interface for input plugins that are able to parse
// arbitrary data formats.
type ParserFuncInput interface {
// SetParserFunc returns a new parser.
SetParserFunc(fn ParserFunc)
}
// Parser is an interface defining functions that a parser plugin must satisfy.
type Parser interface {
// Parse takes a byte buffer separated by newlines
// ie, `cpu.usage.idle 90\ncpu.usage.busy 10`
// and parses it into telegraf metrics
//
// Must be thread-safe.
Parse(buf []byte) ([]telegraf.Metric, error)
// ParseLine takes a single string metric
// ie, "cpu.usage.idle 90"
// and parses it into a telegraf metric.
//
// Must be thread-safe.
// This function is only called by plugins that expect line based protocols
// Doesn't need to be implemented by non-linebased parsers (e.g. json, xml)
ParseLine(line string) (telegraf.Metric, error)
// SetDefaultTags tells the parser to add all of the given tags
// to each parsed metric.
// NOTE: do _not_ modify the map after you've passed it here!!
SetDefaultTags(tags map[string]string)
}
// ParserCompatibility is an interface for backward-compatible initialization of new parsers
type ParserCompatibility interface {
// InitFromConfig sets the parser internal variables from the old-style config
InitFromConfig(config *Config) error
}
// Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers.
type Config struct {
// DataFormat can be one of: json, influx, graphite, value, nagios
DataFormat string `toml:"data_format"`
// Separator only applied to Graphite data.
Separator string `toml:"separator"`
// Templates only apply to Graphite data.
Templates []string `toml:"templates"`
// TagKeys only apply to JSON data
TagKeys []string `toml:"tag_keys"`
// Array of glob pattern strings keys that should be added as string fields.
JSONStringFields []string `toml:"json_string_fields"`
JSONNameKey string `toml:"json_name_key"`
// MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string `toml:"metric_name"`
// holds a gjson path for json parser
JSONQuery string `toml:"json_query"`
// key of time
JSONTimeKey string `toml:"json_time_key"`
// time format
JSONTimeFormat string `toml:"json_time_format"`
// default timezone
JSONTimezone string `toml:"json_timezone"`
// Whether to continue if a JSON object can't be coerced
JSONStrict bool `toml:"json_strict"`
// Authentication file for collectd
CollectdAuthFile string `toml:"collectd_auth_file"`
// One of none (default), sign, or encrypt
CollectdSecurityLevel string `toml:"collectd_security_level"`
// Dataset specification for collectd
CollectdTypesDB []string `toml:"collectd_types_db"`
// whether to split or join multivalue metrics
CollectdSplit string `toml:"collectd_split"`
// DataType only applies to value, this will be the type to parse value to
DataType string `toml:"data_type"`
// DefaultTags are the default tags that will be added to all parsed metrics.
DefaultTags map[string]string `toml:"default_tags"`
// an optional json path containing the metric registry object
// if left empty, the whole json object is parsed as a metric registry
DropwizardMetricRegistryPath string `toml:"dropwizard_metric_registry_path"`
// an optional json path containing the default time of the metrics
// if left empty, the processing time is used
DropwizardTimePath string `toml:"dropwizard_time_path"`
// time format to use for parsing the time field
// defaults to time.RFC3339
DropwizardTimeFormat string `toml:"dropwizard_time_format"`
// an optional json path pointing to a json object with tag key/value pairs
// takes precedence over DropwizardTagPathsMap
DropwizardTagsPath string `toml:"dropwizard_tags_path"`
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string `toml:"dropwizard_tag_paths_map"`
//grok patterns
GrokPatterns []string `toml:"grok_patterns"`
GrokNamedPatterns []string `toml:"grok_named_patterns"`
GrokCustomPatterns string `toml:"grok_custom_patterns"`
GrokCustomPatternFiles []string `toml:"grok_custom_pattern_files"`
GrokTimezone string `toml:"grok_timezone"`
GrokUniqueTimestamp string `toml:"grok_unique_timestamp"`
//csv configuration
CSVColumnNames []string `toml:"csv_column_names"`
CSVColumnTypes []string `toml:"csv_column_types"`
CSVComment string `toml:"csv_comment"`
CSVDelimiter string `toml:"csv_delimiter"`
CSVHeaderRowCount int `toml:"csv_header_row_count"`
CSVMeasurementColumn string `toml:"csv_measurement_column"`
CSVSkipColumns int `toml:"csv_skip_columns"`
CSVSkipRows int `toml:"csv_skip_rows"`
CSVTagColumns []string `toml:"csv_tag_columns"`
CSVTimestampColumn string `toml:"csv_timestamp_column"`
CSVTimestampFormat string `toml:"csv_timestamp_format"`
CSVTimezone string `toml:"csv_timezone"`
CSVTrimSpace bool `toml:"csv_trim_space"`
CSVSkipValues []string `toml:"csv_skip_values"`
CSVSkipErrors bool `toml:"csv_skip_errors"`
CSVMetadataRows int `toml:"csv_metadata_rows"`
CSVMetadataSeparators []string `toml:"csv_metadata_separators"`
CSVMetadataTrimSet string `toml:"csv_metadata_trim_set"`
// FormData configuration
FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"`
// Prometheus configuration
PrometheusIgnoreTimestamp bool `toml:"prometheus_ignore_timestamp"`
// Value configuration
ValueFieldName string `toml:"value_field_name"`
// XPath configuration
XPathPrintDocument bool `toml:"xpath_print_document"`
XPathProtobufFile string `toml:"xpath_protobuf_file"`
XPathProtobufType string `toml:"xpath_protobuf_type"`
XPathProtobufImportPaths []string `toml:"xpath_protobuf_import_paths"`
XPathConfig []XPathConfig
// JSONPath configuration
JSONV2Config []JSONV2Config `toml:"json_v2"`
// Influx configuration
InfluxParserType string `toml:"influx_parser_type"`
}
type XPathConfig xpath.Config
type JSONV2Config struct {
json_v2.Config
}
// NewParser returns a Parser interface based on the given config.
func NewParser(config *Config) (Parser, error) {
var err error
var parser Parser
switch config.DataFormat {
case "json":
parser, err = json.New(
&json.Config{
MetricName: config.MetricName,
TagKeys: config.TagKeys,
NameKey: config.JSONNameKey,
StringFields: config.JSONStringFields,
Query: config.JSONQuery,
TimeKey: config.JSONTimeKey,
TimeFormat: config.JSONTimeFormat,
Timezone: config.JSONTimezone,
DefaultTags: config.DefaultTags,
Strict: config.JSONStrict,
},
)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.ValueFieldName, config.DefaultTags)
case "influx":
if config.InfluxParserType == "upstream" {
parser, err = NewInfluxUpstreamParser()
} else {
parser, err = NewInfluxParser()
}
case "nagios":
parser, err = NewNagiosParser()
case "graphite":
parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags)
case "collectd":
parser, err = NewCollectdParser(config.CollectdAuthFile,
config.CollectdSecurityLevel, config.CollectdTypesDB, config.CollectdSplit)
case "dropwizard":
parser, err = NewDropwizardParser(
config.DropwizardMetricRegistryPath,
config.DropwizardTimePath,
config.DropwizardTimeFormat,
config.DropwizardTagsPath,
config.DropwizardTagPathsMap,
config.DefaultTags,
config.Separator,
config.Templates)
case "wavefront":
parser, err = NewWavefrontParser(config.DefaultTags)
case "grok":
parser, err = newGrokParser(
config.MetricName,
config.GrokPatterns,
config.GrokNamedPatterns,
config.GrokCustomPatterns,
config.GrokCustomPatternFiles,
config.GrokTimezone,
config.GrokUniqueTimestamp)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
case "form_urlencoded":
parser, err = NewFormUrlencodedParser(
config.MetricName,
config.DefaultTags,
config.FormUrlencodedTagKeys,
)
case "prometheus":
parser, err = NewPrometheusParser(
config.DefaultTags,
config.PrometheusIgnoreTimestamp,
)
case "prometheusremotewrite":
parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags)
case "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf":
parser = &xpath.Parser{
Format: config.DataFormat,
ProtobufMessageDef: config.XPathProtobufFile,
ProtobufMessageType: config.XPathProtobufType,
ProtobufImportPaths: config.XPathProtobufImportPaths,
PrintDocument: config.XPathPrintDocument,
DefaultTags: config.DefaultTags,
Configs: NewXPathParserConfigs(config.MetricName, config.XPathConfig),
}
case "json_v2":
parser, err = NewJSONPathParser(config.JSONV2Config)
default:
creator, found := Parsers[config.DataFormat]
if !found {
return nil, fmt.Errorf("invalid data format: %s", config.DataFormat)
}
// Try to create new-style parsers the old way...
// DEPRECATED: Please instantiate the parser directly instead of using this function.
parser = creator(config.MetricName)
p, ok := parser.(ParserCompatibility)
if !ok {
return nil, fmt.Errorf("parser for %q cannot be created the old way", config.DataFormat)
}
err = p.InitFromConfig(config)
}
return parser, err
}
func newGrokParser(metricName string,
patterns []string, nPatterns []string,
cPatterns string, cPatternFiles []string,
tZone string, uniqueTimestamp string) (Parser, error) {
parser := grok.Parser{
Measurement: metricName,
Patterns: patterns,
NamedPatterns: nPatterns,
CustomPatterns: cPatterns,
CustomPatternFiles: cPatternFiles,
Timezone: tZone,
UniqueTimestamp: uniqueTimestamp,
}
err := parser.Compile()
return &parser, err
}
func NewNagiosParser() (Parser, error) {
return &nagios.NagiosParser{}, nil
}
func NewInfluxParser() (Parser, error) {
handler := influx.NewMetricHandler()
return influx.NewParser(handler), nil
}
func NewInfluxUpstreamParser() (Parser, error) {
return influx_upstream.NewParser(), nil
}
func NewGraphiteParser(
separator string,
templates []string,
defaultTags map[string]string,
) (Parser, error) {
return graphite.NewGraphiteParser(separator, templates, defaultTags)
}
func NewValueParser(
metricName string,
dataType string,
fieldName string,
defaultTags map[string]string,
) (Parser, error) {
return value.NewValueParser(metricName, dataType, fieldName, defaultTags), nil
}
func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
split string,
) (Parser, error) {
return collectd.NewCollectdParser(authFile, securityLevel, typesDB, split)
}
func NewDropwizardParser(
metricRegistryPath string,
timePath string,
timeFormat string,
tagsPath string,
tagPathsMap map[string]string,
defaultTags map[string]string,
separator string,
templates []string,
) (Parser, error) {
parser := dropwizard.NewParser()
parser.MetricRegistryPath = metricRegistryPath
parser.TimePath = timePath
parser.TimeFormat = timeFormat
parser.TagsPath = tagsPath
parser.TagPathsMap = tagPathsMap
parser.DefaultTags = defaultTags
err := parser.SetTemplates(separator, templates)
if err != nil {
return nil, err
}
return parser, err
}
// NewLogFmtParser returns a logfmt parser with the default options.
func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) {
return logfmt.NewParser(metricName, defaultTags), nil
}
func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
return wavefront.NewWavefrontParser(defaultTags), nil
}
func NewFormUrlencodedParser(
metricName string,
defaultTags map[string]string,
tagKeys []string,
) (Parser, error) {
return &form_urlencoded.Parser{
MetricName: metricName,
DefaultTags: defaultTags,
TagKeys: tagKeys,
}, nil
}
func NewPrometheusParser(defaultTags map[string]string, ignoreTimestamp bool) (Parser, error) {
return &prometheus.Parser{
DefaultTags: defaultTags,
IgnoreTimestamp: ignoreTimestamp,
}, nil
}
func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, error) {
return &prometheusremotewrite.Parser{
DefaultTags: defaultTags,
}, nil
}
func NewXPathParserConfigs(metricName string, cfgs []XPathConfig) []xpath.Config {
// Convert the config formats which is a one-to-one copy
configs := make([]xpath.Config, 0, len(cfgs))
for _, cfg := range cfgs {
config := xpath.Config(cfg)
config.MetricDefaultName = metricName
configs = append(configs, config)
}
return configs
}
func NewJSONPathParser(jsonv2config []JSONV2Config) (Parser, error) {
configs := make([]json_v2.Config, len(jsonv2config))
for i, cfg := range jsonv2config {
configs[i].MeasurementName = cfg.MeasurementName
configs[i].MeasurementNamePath = cfg.MeasurementNamePath
configs[i].TimestampPath = cfg.TimestampPath
configs[i].TimestampFormat = cfg.TimestampFormat
configs[i].TimestampTimezone = cfg.TimestampTimezone
configs[i].Fields = cfg.Fields
configs[i].Tags = cfg.Tags
configs[i].JSONObjects = cfg.JSONObjects
}
return &json_v2.Parser{
Configs: configs,
}, nil
}