config/config.go (1,578 lines of code) (raw):
package config
import (
"bytes"
"crypto/tls"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/coreos/go-semver/semver"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/aggregators"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/json_v2"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"
)
var (
// Default sections
sectionDefaults = []string{"global_tags", "agent", "outputs",
"processors", "aggregators", "inputs"}
// Default input plugins
inputDefaults = []string{"cpu", "mem", "swap", "system", "kernel",
"processes", "disk", "diskio"}
// Default output plugins
outputDefaults = []string{"influxdb"}
// envVarRe is a regex to find environment variables in the config file
envVarRe = regexp.MustCompile(`\$\{(\w+)\}|\$(\w+)`)
envVarEscaper = strings.NewReplacer(
`"`, `\"`,
`\`, `\\`,
)
httpLoadConfigRetryInterval = 10 * time.Second
// fetchURLRe is a regex to determine whether the requested file should
// be fetched from a remote or read from the filesystem.
fetchURLRe = regexp.MustCompile(`^\w+://`)
)
// Config specifies the URL/user/password for the database that telegraf
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
toml *toml.Config
errs []error // config load errors.
UnusedFields map[string]bool
AllowUnusedFields bool
Tags map[string]string
InputFilters []string
OutputFilters []string
Agent *AgentConfig
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Aggregators []*models.RunningAggregator
Parsers []*models.RunningParser
// Processors have a slice wrapper type because they need to be sorted
Processors models.RunningProcessors
AggProcessors models.RunningProcessors
Deprecations map[string][]int64
version *semver.Version
}
// NewConfig creates a new struct to hold the Telegraf config.
// For historical reasons, It holds the actual instances of the running plugins
// once the configuration is parsed.
func NewConfig() *Config {
c := &Config{
UnusedFields: map[string]bool{},
// Agent defaults:
Agent: &AgentConfig{
Interval: Duration(10 * time.Second),
RoundInterval: true,
FlushInterval: Duration(10 * time.Second),
LogTarget: "file",
LogfileRotationMaxArchives: 5,
},
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Parsers: make([]*models.RunningParser, 0),
Processors: make([]*models.RunningProcessor, 0),
AggProcessors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
Deprecations: make(map[string][]int64),
}
// Handle unknown version
version := internal.Version()
if version == "" || version == "unknown" {
version = "0.0.0-unknown"
}
c.version = semver.New(version)
tomlCfg := &toml.Config{
NormFieldName: toml.DefaultConfig.NormFieldName,
FieldToKey: toml.DefaultConfig.FieldToKey,
MissingField: c.missingTomlField,
}
c.toml = tomlCfg
return c
}
// AgentConfig defines configuration that will be used by the Telegraf agent
type AgentConfig struct {
// Interval at which to gather information
Interval Duration
// RoundInterval rounds collection interval to 'interval'.
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool
// Collected metrics are rounded to the precision specified. Precision is
// specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
// Valid time units are "ns", "us" (or "µs"), "ms", "s".
//
// By default or when set to "0s", precision will be set to the same
// timestamp order as the collection interval, with the maximum being 1s:
// ie, when interval = "10s", precision will be "1s"
// when interval = "250ms", precision will be "1ms"
//
// Precision will NOT be used for service inputs. It is up to each individual
// service input to set the timestamp at the appropriate precision.
Precision Duration
// CollectionJitter is used to jitter the collection by a random amount.
// Each plugin will sleep for a random time within jitter before collecting.
// This can be used to avoid many plugins querying things like sysfs at the
// same time, which can have a measurable effect on the system.
CollectionJitter Duration
// CollectionOffset is used to shift the collection by the given amount.
// This can be be used to avoid many plugins querying constraint devices
// at the same time by manually scheduling them in time.
CollectionOffset Duration
// FlushInterval is the Interval at which to flush data
FlushInterval Duration
// FlushJitter Jitters the flush interval by a random amount.
// This is primarily to avoid large write spikes for users running a large
// number of telegraf instances.
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
FlushJitter Duration
// MetricBatchSize is the maximum number of metrics that is wrote to an
// output plugin in one call.
MetricBatchSize int
// MetricBufferLimit is the max number of metrics that each output plugin
// will cache. The buffer is cleared when a successful write occurs. When
// full, the oldest metrics will be overwritten. This number should be a
// multiple of MetricBatchSize. Due to current implementation, this could
// not be less than 2 times MetricBatchSize.
MetricBufferLimit int
// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
// it fills up, regardless of FlushInterval. Setting this option to true
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool `toml:"flush_buffer_when_full" deprecated:"0.13.0;2.0.0;option is ignored"`
// TODO(cam): Remove UTC and parameter, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatibility
UTC bool `toml:"utc" deprecated:"1.0.0;option is ignored"`
// Debug is the option for running in debug mode
Debug bool `toml:"debug"`
// Quiet is the option for running in quiet mode
Quiet bool `toml:"quiet"`
// Log target controls the destination for logs and can be one of "file",
// "stderr" or, on Windows, "eventlog". When set to "file", the output file
// is determined by the "logfile" setting.
LogTarget string `toml:"logtarget"`
// Name of the file to be logged to when using the "file" logtarget. If set to
// the empty string then logs are written to stderr.
Logfile string `toml:"logfile"`
// The file will be rotated after the time interval specified. When set
// to 0 no time based rotation is performed.
LogfileRotationInterval Duration `toml:"logfile_rotation_interval"`
// The logfile will be rotated when it becomes larger than the specified
// size. When set to 0 no size based rotation is performed.
LogfileRotationMaxSize Size `toml:"logfile_rotation_max_size"`
// Maximum number of rotated archives to keep, any older logs are deleted.
// If set to -1, no archives are removed.
LogfileRotationMaxArchives int `toml:"logfile_rotation_max_archives"`
// Pick a timezone to use when logging or type 'local' for local time.
LogWithTimezone string `toml:"log_with_timezone"`
Hostname string
OmitHostname bool
// Method for translating SNMP objects. 'netsnmp' to call external programs,
// 'gosmi' to use the built-in library.
SnmpTranslator string `toml:"snmp_translator"`
}
// InputNames returns a list of strings of the configured inputs.
func (c *Config) InputNames() []string {
var name []string
for _, input := range c.Inputs {
name = append(name, input.Config.Name)
}
return PluginNameCounts(name)
}
// AggregatorNames returns a list of strings of the configured aggregators.
func (c *Config) AggregatorNames() []string {
var name []string
for _, aggregator := range c.Aggregators {
name = append(name, aggregator.Config.Name)
}
return PluginNameCounts(name)
}
// ParserNames returns a list of strings of the configured parsers.
func (c *Config) ParserNames() []string {
var name []string
for _, parser := range c.Parsers {
name = append(name, parser.Config.DataFormat)
}
return PluginNameCounts(name)
}
// ProcessorNames returns a list of strings of the configured processors.
func (c *Config) ProcessorNames() []string {
var name []string
for _, processor := range c.Processors {
name = append(name, processor.Config.Name)
}
return PluginNameCounts(name)
}
// OutputNames returns a list of strings of the configured outputs.
func (c *Config) OutputNames() []string {
var name []string
for _, output := range c.Outputs {
name = append(name, output.Config.Name)
}
return PluginNameCounts(name)
}
// PluginNameCounts returns a list of sorted plugin names and their count
func PluginNameCounts(plugins []string) []string {
names := make(map[string]int)
for _, plugin := range plugins {
names[plugin]++
}
var namecount []string
for name, count := range names {
if count == 1 {
namecount = append(namecount, name)
} else {
namecount = append(namecount, fmt.Sprintf("%s (%dx)", name, count))
}
}
sort.Strings(namecount)
return namecount
}
// ListTags returns a string of tags specified in the config,
// line-protocol style
func (c *Config) ListTags() string {
var tags []string
for k, v := range c.Tags {
tags = append(tags, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(tags)
return strings.Join(tags, " ")
}
var header = `# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply surround
# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"),
# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR})
`
var globalTagsConfig = `
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
`
var agentConfig = `
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## Maximum number of unwritten metrics per output. Increasing this value
## allows for longer periods of output downtime without dropping metrics at the
## cost of higher maximum memory usage.
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Collection offset is used to shift the collection by the given amount.
## This can be be used to avoid many plugins querying constraint devices
## at the same time by manually scheduling them in time.
# collection_offset = "0s"
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Collected metrics are rounded to the precision specified. Precision is
## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
##
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s:
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
##
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
precision = "0s"
## Log at debug level.
# debug = false
## Log only error level messages.
# quiet = false
## Log target controls the destination for logs and can be one of "file",
## "stderr" or, on Windows, "eventlog". When set to "file", the output file
## is determined by the "logfile" setting.
# logtarget = "file"
## Name of the file to be logged to when using the "file" logtarget. If set to
## the empty string then logs are written to stderr.
# logfile = ""
## The logfile will be rotated after the time interval specified. When set
## to 0 no time based rotation is performed. Logs are rotated only when
## written to, if there is no log activity rotation may be delayed.
# logfile_rotation_interval = "0d"
## The logfile will be rotated when it becomes larger than the specified
## size. When set to 0 no size based rotation is performed.
# logfile_rotation_max_size = "0MB"
## Maximum number of rotated archives to keep, any older logs are deleted.
## If set to -1, no archives are removed.
# logfile_rotation_max_archives = 5
## Pick a timezone to use when logging or type 'local' for local time.
## Example: America/Chicago
# log_with_timezone = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
## Method of translating SNMP objects. Can be "netsnmp" which
## translates by calling external programs snmptranslate and snmptable,
## or "gosmi" which translates using the built-in gosmi library.
# snmp_translator = "netsnmp"
`
var outputHeader = `
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
`
var processorHeader = `
###############################################################################
# PROCESSOR PLUGINS #
###############################################################################
`
var aggregatorHeader = `
###############################################################################
# AGGREGATOR PLUGINS #
###############################################################################
`
var inputHeader = `
###############################################################################
# INPUT PLUGINS #
###############################################################################
`
var serviceInputHeader = `
###############################################################################
# SERVICE INPUT PLUGINS #
###############################################################################
`
// PrintSampleConfig prints the sample config
func PrintSampleConfig(
sectionFilters []string,
inputFilters []string,
outputFilters []string,
aggregatorFilters []string,
processorFilters []string,
) {
// print headers
fmt.Print(header)
if len(sectionFilters) == 0 {
sectionFilters = sectionDefaults
}
printFilteredGlobalSections(sectionFilters)
// print output plugins
if sliceContains("outputs", sectionFilters) {
if len(outputFilters) != 0 {
if len(outputFilters) >= 3 && outputFilters[1] != "none" {
fmt.Print(outputHeader)
}
printFilteredOutputs(outputFilters, false)
} else {
fmt.Print(outputHeader)
printFilteredOutputs(outputDefaults, false)
// Print non-default outputs, commented
var pnames []string
for pname := range outputs.Outputs {
if !sliceContains(pname, outputDefaults) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
printFilteredOutputs(pnames, true)
}
}
// print processor plugins
if sliceContains("processors", sectionFilters) {
if len(processorFilters) != 0 {
if len(processorFilters) >= 3 && processorFilters[1] != "none" {
fmt.Print(processorHeader)
}
printFilteredProcessors(processorFilters, false)
} else {
fmt.Print(processorHeader)
pnames := []string{}
for pname := range processors.Processors {
pnames = append(pnames, pname)
}
sort.Strings(pnames)
printFilteredProcessors(pnames, true)
}
}
// print aggregator plugins
if sliceContains("aggregators", sectionFilters) {
if len(aggregatorFilters) != 0 {
if len(aggregatorFilters) >= 3 && aggregatorFilters[1] != "none" {
fmt.Print(aggregatorHeader)
}
printFilteredAggregators(aggregatorFilters, false)
} else {
fmt.Print(aggregatorHeader)
pnames := []string{}
for pname := range aggregators.Aggregators {
pnames = append(pnames, pname)
}
sort.Strings(pnames)
printFilteredAggregators(pnames, true)
}
}
// print input plugins
if sliceContains("inputs", sectionFilters) {
if len(inputFilters) != 0 {
if len(inputFilters) >= 3 && inputFilters[1] != "none" {
fmt.Print(inputHeader)
}
printFilteredInputs(inputFilters, false)
} else {
fmt.Print(inputHeader)
printFilteredInputs(inputDefaults, false)
// Print non-default inputs, commented
var pnames []string
for pname := range inputs.Inputs {
if !sliceContains(pname, inputDefaults) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
printFilteredInputs(pnames, true)
}
}
}
func printFilteredProcessors(processorFilters []string, commented bool) {
// Filter processors
var pnames []string
for pname := range processors.Processors {
if sliceContains(pname, processorFilters) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
// Print Outputs
for _, pname := range pnames {
creator := processors.Processors[pname]
output := creator()
printConfig(pname, output, "processors", commented, processors.Deprecations[pname])
}
}
func printFilteredAggregators(aggregatorFilters []string, commented bool) {
// Filter outputs
var anames []string
for aname := range aggregators.Aggregators {
if sliceContains(aname, aggregatorFilters) {
anames = append(anames, aname)
}
}
sort.Strings(anames)
// Print Outputs
for _, aname := range anames {
creator := aggregators.Aggregators[aname]
output := creator()
printConfig(aname, output, "aggregators", commented, aggregators.Deprecations[aname])
}
}
func printFilteredInputs(inputFilters []string, commented bool) {
// Filter inputs
var pnames []string
for pname := range inputs.Inputs {
if sliceContains(pname, inputFilters) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
// cache service inputs to print them at the end
servInputs := make(map[string]telegraf.ServiceInput)
// for alphabetical looping:
servInputNames := []string{}
// Print Inputs
for _, pname := range pnames {
// Skip inputs that are registered twice for backward compatibility
if pname == "cisco_telemetry_gnmi" {
continue
}
if pname == "KNXListener" {
continue
}
creator := inputs.Inputs[pname]
input := creator()
if p, ok := input.(telegraf.ServiceInput); ok {
servInputs[pname] = p
servInputNames = append(servInputNames, pname)
continue
}
printConfig(pname, input, "inputs", commented, inputs.Deprecations[pname])
}
// Print Service Inputs
if len(servInputs) == 0 {
return
}
sort.Strings(servInputNames)
fmt.Print(serviceInputHeader)
for _, name := range servInputNames {
printConfig(name, servInputs[name], "inputs", commented, inputs.Deprecations[name])
}
}
func printFilteredOutputs(outputFilters []string, commented bool) {
// Filter outputs
var onames []string
for oname := range outputs.Outputs {
if sliceContains(oname, outputFilters) {
onames = append(onames, oname)
}
}
sort.Strings(onames)
// Print Outputs
for _, oname := range onames {
creator := outputs.Outputs[oname]
output := creator()
printConfig(oname, output, "outputs", commented, outputs.Deprecations[oname])
}
}
func printFilteredGlobalSections(sectionFilters []string) {
if sliceContains("global_tags", sectionFilters) {
fmt.Print(globalTagsConfig)
}
if sliceContains("agent", sectionFilters) {
fmt.Print(agentConfig)
}
}
func printConfig(name string, p telegraf.PluginDescriber, op string, commented bool, di telegraf.DeprecationInfo) {
comment := ""
if commented {
comment = "# "
}
fmt.Printf("\n%s# %s\n%s[[%s.%s]]", comment, p.Description(), comment, op, name)
if di.Since != "" {
removalNote := ""
if di.RemovalIn != "" {
removalNote = " and will be removed in " + di.RemovalIn
}
fmt.Printf("\n%s ## DEPRECATED: The '%s' plugin is deprecated in version %s%s, %s.", comment, name, di.Since, removalNote, di.Notice)
}
config := p.SampleConfig()
if config == "" {
fmt.Printf("\n%s # no configuration\n\n", comment)
} else {
lines := strings.Split(config, "\n")
for i, line := range lines {
if i == 0 || i == len(lines)-1 {
fmt.Print("\n")
continue
}
fmt.Print(strings.TrimRight(comment+line, " ") + "\n")
}
}
}
func sliceContains(name string, list []string) bool {
for _, b := range list {
if b == name {
return true
}
}
return false
}
// PrintInputConfig prints the config usage of a single input.
func PrintInputConfig(name string) error {
creator, ok := inputs.Inputs[name]
if !ok {
return fmt.Errorf("input %s not found", name)
}
printConfig(name, creator(), "inputs", false, inputs.Deprecations[name])
return nil
}
// PrintOutputConfig prints the config usage of a single output.
func PrintOutputConfig(name string) error {
creator, ok := outputs.Outputs[name]
if !ok {
return fmt.Errorf("output %s not found", name)
}
printConfig(name, creator(), "outputs", false, outputs.Deprecations[name])
return nil
}
// LoadDirectory loads all toml config files found in the specified path, recursively.
func (c *Config) LoadDirectory(path string) error {
walkfn := func(thispath string, info os.FileInfo, _ error) error {
if info == nil {
log.Printf("W! Telegraf is not permitted to read %s", thispath)
return nil
}
if info.IsDir() {
if strings.HasPrefix(info.Name(), "..") {
// skip Kubernetes mounts, prevening loading the same config twice
return filepath.SkipDir
}
return nil
}
name := info.Name()
if len(name) < 6 || name[len(name)-5:] != ".conf" {
return nil
}
err := c.LoadConfig(thispath)
if err != nil {
return err
}
return nil
}
return filepath.Walk(path, walkfn)
}
// Try to find a default config file at these locations (in order):
// 1. $TELEGRAF_CONFIG_PATH
// 2. $HOME/.telegraf/telegraf.conf
// 3. /etc/telegraf/telegraf.conf
func getDefaultConfigPath() (string, error) {
envfile := os.Getenv("TELEGRAF_CONFIG_PATH")
homefile := os.ExpandEnv("${HOME}/.telegraf/telegraf.conf")
etcfile := "/etc/telegraf/telegraf.conf"
if runtime.GOOS == "windows" {
programFiles := os.Getenv("ProgramFiles")
if programFiles == "" { // Should never happen
programFiles = `C:\Program Files`
}
etcfile = programFiles + `\Telegraf\telegraf.conf`
}
for _, path := range []string{envfile, homefile, etcfile} {
if isURL(path) {
log.Printf("I! Using config url: %s", path)
return path, nil
}
if _, err := os.Stat(path); err == nil {
log.Printf("I! Using config file: %s", path)
return path, nil
}
}
// if we got here, we didn't find a file in a default location
return "", fmt.Errorf("No config file specified, and could not find one"+
" in $TELEGRAF_CONFIG_PATH, %s, or %s", homefile, etcfile)
}
// isURL checks if string is valid url
func isURL(str string) bool {
u, err := url.Parse(str)
return err == nil && u.Scheme != "" && u.Host != ""
}
// LoadConfig loads the given config file and applies it to c
func (c *Config) LoadConfig(path string) error {
var err error
if path == "" {
if path, err = getDefaultConfigPath(); err != nil {
return err
}
}
data, err := loadConfig(path)
if err != nil {
return fmt.Errorf("Error loading config file %s: %w", path, err)
}
if err = c.LoadConfigData(data); err != nil {
return fmt.Errorf("Error loading config file %s: %w", path, err)
}
return nil
}
// LoadConfigData loads TOML-formatted config data
func (c *Config) LoadConfigData(data []byte) error {
tbl, err := parseConfig(data)
if err != nil {
return fmt.Errorf("Error parsing data: %s", err)
}
// Parse tags tables first:
for _, tableName := range []string{"tags", "global_tags"} {
if val, ok := tbl.Fields[tableName]; ok {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("invalid configuration, bad table name %q", tableName)
}
if err = c.toml.UnmarshalTable(subTable, c.Tags); err != nil {
return fmt.Errorf("error parsing table name %q: %s", tableName, err)
}
}
}
// Parse agent table:
if val, ok := tbl.Fields["agent"]; ok {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("invalid configuration, error parsing agent table")
}
if err = c.toml.UnmarshalTable(subTable, c.Agent); err != nil {
return fmt.Errorf("error parsing [agent]: %w", err)
}
}
if !c.Agent.OmitHostname {
if c.Agent.Hostname == "" {
hostname, err := os.Hostname()
if err != nil {
return err
}
c.Agent.Hostname = hostname
}
c.Tags["host"] = c.Agent.Hostname
}
// Set snmp agent translator default
if c.Agent.SnmpTranslator == "" {
c.Agent.SnmpTranslator = "netsnmp"
}
if !c.AllowUnusedFields && len(c.UnusedFields) > 0 {
return fmt.Errorf("line %d: configuration specified the fields %q, but they weren't used", tbl.Line, keys(c.UnusedFields))
}
// Parse all the rest of the plugins:
for name, val := range tbl.Fields {
subTable, ok := val.(*ast.Table)
if !ok {
return fmt.Errorf("invalid configuration, error parsing field %q as table", name)
}
switch name {
case "agent", "global_tags", "tags":
case "outputs":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
// legacy [outputs.influxdb] support
case *ast.Table:
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("error parsing %s, %w", pluginName, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addOutput(pluginName, t); err != nil {
return fmt.Errorf("error parsing %s array, %w", pluginName, err)
}
}
default:
return fmt.Errorf("unsupported config format: %s",
pluginName)
}
if !c.AllowUnusedFields && len(c.UnusedFields) > 0 {
return fmt.Errorf("plugin %s.%s: line %d: configuration specified the fields %q, but they weren't used", name, pluginName, subTable.Line, keys(c.UnusedFields))
}
}
case "inputs", "plugins":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
// legacy [inputs.cpu] support
case *ast.Table:
if err = c.addInput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("error parsing %s, %w", pluginName, err)
}
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addInput(pluginName, t); err != nil {
return fmt.Errorf("error parsing %s, %w", pluginName, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s",
pluginName)
}
if !c.AllowUnusedFields && len(c.UnusedFields) > 0 {
return fmt.Errorf("plugin %s.%s: line %d: configuration specified the fields %q, but they weren't used", name, pluginName, subTable.Line, keys(c.UnusedFields))
}
}
case "processors":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addProcessor(pluginName, t); err != nil {
return fmt.Errorf("error parsing %s, %w", pluginName, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s",
pluginName)
}
if !c.AllowUnusedFields && len(c.UnusedFields) > 0 {
return fmt.Errorf("plugin %s.%s: line %d: configuration specified the fields %q, but they weren't used", name, pluginName, subTable.Line, keys(c.UnusedFields))
}
}
case "aggregators":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addAggregator(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", pluginName, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s",
pluginName)
}
if !c.AllowUnusedFields && len(c.UnusedFields) > 0 {
return fmt.Errorf("plugin %s.%s: line %d: configuration specified the fields %q, but they weren't used", name, pluginName, subTable.Line, keys(c.UnusedFields))
}
}
// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
if err = c.addInput(name, subTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", name, err)
}
}
}
if len(c.Processors) > 1 {
sort.Sort(c.Processors)
}
return nil
}
// trimBOM trims the Byte-Order-Marks from the beginning of the file.
// this is for Windows compatibility only.
// see https://github.com/influxdata/telegraf/issues/1378
func trimBOM(f []byte) []byte {
return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf"))
}
// escapeEnv escapes a value for inserting into a TOML string.
func escapeEnv(value string) string {
return envVarEscaper.Replace(value)
}
func loadConfig(config string) ([]byte, error) {
if fetchURLRe.MatchString(config) {
u, err := url.Parse(config)
if err != nil {
return nil, err
}
switch u.Scheme {
case "https", "http":
return fetchConfig(u)
default:
return nil, fmt.Errorf("scheme %q not supported", u.Scheme)
}
}
// If it isn't a https scheme, try it as a file
return os.ReadFile(config)
}
func fetchConfig(u *url.URL) ([]byte, error) {
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
if v, exists := os.LookupEnv("INFLUX_TOKEN"); exists {
req.Header.Add("Authorization", "Token "+v)
}
req.Header.Add("Accept", "application/toml")
req.Header.Set("User-Agent", internal.ProductToken())
retries := 3
for i := 0; i <= retries; i++ {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("Retry %d of %d failed connecting to HTTP config server %s", i, retries, err)
}
if resp.StatusCode != http.StatusOK {
if i < retries {
log.Printf("Error getting HTTP config. Retry %d of %d in %s. Status=%d", i, retries, httpLoadConfigRetryInterval, resp.StatusCode)
time.Sleep(httpLoadConfigRetryInterval)
continue
}
return nil, fmt.Errorf("Retry %d of %d failed to retrieve remote config: %s", i, retries, resp.Status)
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
return nil, nil
}
// parseConfig loads a TOML configuration from a provided path and
// returns the AST produced from the TOML parser. When loading the file, it
// will find environment variables and replace them.
func parseConfig(contents []byte) (*ast.Table, error) {
contents = trimBOM(contents)
parameters := envVarRe.FindAllSubmatch(contents, -1)
for _, parameter := range parameters {
if len(parameter) != 3 {
continue
}
var envVar []byte
if parameter[1] != nil {
envVar = parameter[1]
} else if parameter[2] != nil {
envVar = parameter[2]
} else {
continue
}
envVal, ok := os.LookupEnv(strings.TrimPrefix(string(envVar), "$"))
if ok {
envVal = escapeEnv(envVal)
contents = bytes.Replace(contents, parameter[0], []byte(envVal), 1)
}
}
return toml.Parse(contents)
}
func (c *Config) addAggregator(name string, table *ast.Table) error {
creator, ok := aggregators.Aggregators[name]
if !ok {
// Handle removed, deprecated plugins
if di, deprecated := aggregators.Deprecations[name]; deprecated {
printHistoricPluginDeprecationNotice("aggregators", name, di)
return fmt.Errorf("plugin deprecated")
}
return fmt.Errorf("Undefined but requested aggregator: %s", name)
}
aggregator := creator()
conf, err := c.buildAggregator(name, table)
if err != nil {
return err
}
if err := c.toml.UnmarshalTable(table, aggregator); err != nil {
return err
}
if err := c.printUserDeprecation("aggregators", name, aggregator); err != nil {
return err
}
c.Aggregators = append(c.Aggregators, models.NewRunningAggregator(aggregator, conf))
return nil
}
func (c *Config) probeParser(table *ast.Table) bool {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
_, ok := parsers.Parsers[dataformat]
return ok
}
func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)
creator, ok := parsers.Parsers[dataformat]
if !ok {
return nil, fmt.Errorf("Undefined but requested parser: %s", dataformat)
}
parser := creator(parentname)
conf, err := c.buildParser(parentname, table)
if err != nil {
return nil, err
}
if err := c.toml.UnmarshalTable(table, parser); err != nil {
return nil, err
}
running := models.NewRunningParser(parser, conf)
c.Parsers = append(c.Parsers, running)
return running, nil
}
func (c *Config) addProcessor(name string, table *ast.Table) error {
creator, ok := processors.Processors[name]
if !ok {
// Handle removed, deprecated plugins
if di, deprecated := processors.Deprecations[name]; deprecated {
printHistoricPluginDeprecationNotice("processors", name, di)
return fmt.Errorf("plugin deprecated")
}
return fmt.Errorf("Undefined but requested processor: %s", name)
}
processorConfig, err := c.buildProcessor(name, table)
if err != nil {
return err
}
rf, err := c.newRunningProcessor(creator, processorConfig, table)
if err != nil {
return err
}
c.Processors = append(c.Processors, rf)
// save a copy for the aggregator
rf, err = c.newRunningProcessor(creator, processorConfig, table)
if err != nil {
return err
}
c.AggProcessors = append(c.AggProcessors, rf)
return nil
}
func (c *Config) newRunningProcessor(
creator processors.StreamingCreator,
processorConfig *models.ProcessorConfig,
table *ast.Table,
) (*models.RunningProcessor, error) {
processor := creator()
if p, ok := processor.(unwrappable); ok {
if err := c.toml.UnmarshalTable(table, p.Unwrap()); err != nil {
return nil, err
}
} else {
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return nil, err
}
}
if err := c.printUserDeprecation("processors", processorConfig.Name, processor); err != nil {
return nil, err
}
rf := models.NewRunningProcessor(processor, processorConfig)
return rf, nil
}
func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
}
creator, ok := outputs.Outputs[name]
if !ok {
// Handle removed, deprecated plugins
if di, deprecated := outputs.Deprecations[name]; deprecated {
printHistoricPluginDeprecationNotice("outputs", name, di)
return fmt.Errorf("plugin deprecated")
}
return fmt.Errorf("undefined but requested output: %s", name)
}
output := creator()
// If the output has a SetSerializer function, then this means it can write
// arbitrary types of output, so build the serializer and set it.
if t, ok := output.(serializers.SerializerOutput); ok {
serializer, err := c.buildSerializer(table)
if err != nil {
return err
}
t.SetSerializer(serializer)
}
outputConfig, err := c.buildOutput(name, table)
if err != nil {
return err
}
if err := c.toml.UnmarshalTable(table, output); err != nil {
return err
}
if err := c.printUserDeprecation("outputs", name, output); err != nil {
return err
}
if c, ok := interface{}(output).(interface{ TLSConfig() (*tls.Config, error) }); ok {
if _, err := c.TLSConfig(); err != nil {
return err
}
}
ro := models.NewRunningOutput(output, outputConfig, c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit)
c.Outputs = append(c.Outputs, ro)
return nil
}
func (c *Config) addInput(name string, table *ast.Table) error {
if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) {
return nil
}
// Legacy support renaming io input to diskio
if name == "io" {
if err := c.printUserDeprecation("inputs", name, nil); err != nil {
return err
}
name = "diskio"
}
// For inputs with parsers we need to compute the set of
// options that is not covered by both, the parser and the input.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missThreshold := 0
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()
creator, ok := inputs.Inputs[name]
if !ok {
// Handle removed, deprecated plugins
if di, deprecated := inputs.Deprecations[name]; deprecated {
printHistoricPluginDeprecationNotice("inputs", name, di)
return fmt.Errorf("plugin deprecated")
}
return fmt.Errorf("Undefined but requested input: %s", name)
}
input := creator()
// If the input has a SetParser or SetParserFunc function, it can accept
// arbitrary data-formats, so build the requested parser and set it.
if t, ok := input.(telegraf.ParserInput); ok {
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
}
}
// Keep the old interface for backward compatibility
if t, ok := input.(parsers.ParserInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserInput.
missThreshold = 1
if parser, err := c.addParser(name, table); err == nil {
t.SetParser(parser)
} else {
missThreshold = 0
// Fallback to the old way of instantiating the parsers.
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
parser, err := c.buildParserOld(name, config)
if err != nil {
return err
}
t.SetParser(parser)
}
}
if t, ok := input.(telegraf.ParserFuncInput); ok {
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (telegraf.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.buildParserOld(name, config)
})
}
}
if t, ok := input.(parsers.ParserFuncInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput.
missThreshold = 1
if c.probeParser(table) {
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
})
} else {
missThreshold = 0
// Fallback to the old way
config, err := c.getParserConfig(name, table)
if err != nil {
return err
}
t.SetParserFunc(func() (parsers.Parser, error) {
return c.buildParserOld(name, config)
})
}
}
pluginConfig, err := c.buildInput(name, table)
if err != nil {
return err
}
if err := c.toml.UnmarshalTable(table, input); err != nil {
return err
}
if err := c.printUserDeprecation("inputs", name, input); err != nil {
return err
}
if c, ok := interface{}(input).(interface{ TLSConfig() (*tls.Config, error) }); ok {
if _, err := c.TLSConfig(); err != nil {
return err
}
}
rp := models.NewRunningInput(input, pluginConfig)
rp.SetDefaultTags(c.Tags)
c.Inputs = append(c.Inputs, rp)
// Check the number of misses against the threshold
for key, count := range missCount {
if count <= missThreshold {
continue
}
if err := c.missingTomlField(nil, key); err != nil {
return err
}
}
return nil
}
// buildAggregator parses Aggregator specific items from the ast.Table,
// builds the filter and returns a
// models.AggregatorConfig to be inserted into models.RunningAggregator
func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) {
conf := &models.AggregatorConfig{
Name: name,
Delay: time.Millisecond * 100,
Period: time.Second * 30,
Grace: time.Second * 0,
}
c.getFieldDuration(tbl, "period", &conf.Period)
c.getFieldDuration(tbl, "delay", &conf.Delay)
c.getFieldDuration(tbl, "grace", &conf.Grace)
c.getFieldBool(tbl, "drop_original", &conf.DropOriginal)
c.getFieldString(tbl, "name_prefix", &conf.MeasurementPrefix)
c.getFieldString(tbl, "name_suffix", &conf.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &conf.NameOverride)
c.getFieldString(tbl, "alias", &conf.Alias)
conf.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
if err := c.toml.UnmarshalTable(subtbl, conf.Tags); err != nil {
return nil, fmt.Errorf("could not parse tags for input %s", name)
}
}
}
if c.hasErrs() {
return nil, c.firstErr()
}
var err error
conf.Filter, err = c.buildFilter(tbl)
if err != nil {
return conf, err
}
return conf, nil
}
// buildParser parses Parser specific items from the ast.Table,
// builds the filter and returns a
// models.ParserConfig to be inserted into models.RunningParser
func (c *Config) buildParser(name string, tbl *ast.Table) (*models.ParserConfig, error) {
var dataformat string
c.getFieldString(tbl, "data_format", &dataformat)
conf := &models.ParserConfig{
Parent: name,
DataFormat: dataformat,
}
return conf, nil
}
// buildProcessor parses Processor specific items from the ast.Table,
// builds the filter and returns a
// models.ProcessorConfig to be inserted into models.RunningProcessor
func (c *Config) buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
conf := &models.ProcessorConfig{Name: name}
c.getFieldInt64(tbl, "order", &conf.Order)
c.getFieldString(tbl, "alias", &conf.Alias)
if c.hasErrs() {
return nil, c.firstErr()
}
var err error
conf.Filter, err = c.buildFilter(tbl)
if err != nil {
return conf, err
}
return conf, nil
}
// buildFilter builds a Filter
// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
// be inserted into the models.OutputConfig/models.InputConfig
// to be used for glob filtering on tags and measurements
func (c *Config) buildFilter(tbl *ast.Table) (models.Filter, error) {
f := models.Filter{}
c.getFieldStringSlice(tbl, "namepass", &f.NamePass)
c.getFieldStringSlice(tbl, "namedrop", &f.NameDrop)
c.getFieldStringSlice(tbl, "pass", &f.FieldPass)
c.getFieldStringSlice(tbl, "fieldpass", &f.FieldPass)
c.getFieldStringSlice(tbl, "drop", &f.FieldDrop)
c.getFieldStringSlice(tbl, "fielddrop", &f.FieldDrop)
c.getFieldTagFilter(tbl, "tagpass", &f.TagPass)
c.getFieldTagFilter(tbl, "tagdrop", &f.TagDrop)
c.getFieldStringSlice(tbl, "tagexclude", &f.TagExclude)
c.getFieldStringSlice(tbl, "taginclude", &f.TagInclude)
if c.hasErrs() {
return f, c.firstErr()
}
if err := f.Compile(); err != nil {
return f, err
}
return f, nil
}
// buildInput parses input specific items from the ast.Table,
// builds the filter and returns a
// models.InputConfig to be inserted into models.RunningInput
func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
cp := &models.InputConfig{Name: name}
c.getFieldDuration(tbl, "interval", &cp.Interval)
c.getFieldDuration(tbl, "precision", &cp.Precision)
c.getFieldDuration(tbl, "collection_jitter", &cp.CollectionJitter)
c.getFieldDuration(tbl, "collection_offset", &cp.CollectionOffset)
c.getFieldString(tbl, "name_prefix", &cp.MeasurementPrefix)
c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &cp.NameOverride)
c.getFieldString(tbl, "alias", &cp.Alias)
cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
if err := c.toml.UnmarshalTable(subtbl, cp.Tags); err != nil {
return nil, fmt.Errorf("could not parse tags for input %s", name)
}
}
}
if c.hasErrs() {
return nil, c.firstErr()
}
var err error
cp.Filter, err = c.buildFilter(tbl)
if err != nil {
return cp, err
}
return cp, nil
}
// buildParserOld grabs the necessary entries from the ast.Table for creating
// a parsers.Parser object, and creates it, which can then be added onto
// an Input object.
func (c *Config) buildParserOld(name string, config *parsers.Config) (telegraf.Parser, error) {
parser, err := parsers.NewParser(config)
if err != nil {
return nil, err
}
logger := models.NewLogger("parsers", config.DataFormat, name)
models.SetLoggerOnPlugin(parser, logger)
if initializer, ok := parser.(telegraf.Initializer); ok {
if err := initializer.Init(); err != nil {
return nil, err
}
}
return parser, nil
}
func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
pc := &parsers.Config{
JSONStrict: true,
}
c.getFieldString(tbl, "data_format", &pc.DataFormat)
// Legacy support, exec plugin originally parsed JSON by default.
if name == "exec" && pc.DataFormat == "" {
pc.DataFormat = "json"
} else if pc.DataFormat == "" {
pc.DataFormat = "influx"
}
c.getFieldString(tbl, "separator", &pc.Separator)
c.getFieldStringSlice(tbl, "templates", &pc.Templates)
c.getFieldStringSlice(tbl, "tag_keys", &pc.TagKeys)
c.getFieldStringSlice(tbl, "json_string_fields", &pc.JSONStringFields)
c.getFieldString(tbl, "json_name_key", &pc.JSONNameKey)
c.getFieldString(tbl, "json_query", &pc.JSONQuery)
c.getFieldString(tbl, "json_time_key", &pc.JSONTimeKey)
c.getFieldString(tbl, "json_time_format", &pc.JSONTimeFormat)
c.getFieldString(tbl, "json_timezone", &pc.JSONTimezone)
c.getFieldBool(tbl, "json_strict", &pc.JSONStrict)
c.getFieldString(tbl, "data_type", &pc.DataType)
c.getFieldString(tbl, "collectd_auth_file", &pc.CollectdAuthFile)
c.getFieldString(tbl, "collectd_security_level", &pc.CollectdSecurityLevel)
c.getFieldString(tbl, "collectd_parse_multivalue", &pc.CollectdSplit)
c.getFieldStringSlice(tbl, "collectd_typesdb", &pc.CollectdTypesDB)
c.getFieldString(tbl, "dropwizard_metric_registry_path", &pc.DropwizardMetricRegistryPath)
c.getFieldString(tbl, "dropwizard_time_path", &pc.DropwizardTimePath)
c.getFieldString(tbl, "dropwizard_time_format", &pc.DropwizardTimeFormat)
c.getFieldString(tbl, "dropwizard_tags_path", &pc.DropwizardTagsPath)
c.getFieldStringMap(tbl, "dropwizard_tag_paths", &pc.DropwizardTagPathsMap)
//for grok data_format
c.getFieldStringSlice(tbl, "grok_named_patterns", &pc.GrokNamedPatterns)
c.getFieldStringSlice(tbl, "grok_patterns", &pc.GrokPatterns)
c.getFieldString(tbl, "grok_custom_patterns", &pc.GrokCustomPatterns)
c.getFieldStringSlice(tbl, "grok_custom_pattern_files", &pc.GrokCustomPatternFiles)
c.getFieldString(tbl, "grok_timezone", &pc.GrokTimezone)
c.getFieldString(tbl, "grok_unique_timestamp", &pc.GrokUniqueTimestamp)
c.getFieldStringSlice(tbl, "form_urlencoded_tag_keys", &pc.FormUrlencodedTagKeys)
c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName)
// for influx parser
c.getFieldString(tbl, "influx_parser_type", &pc.InfluxParserType)
//for XPath parser family
if choice.Contains(pc.DataFormat, []string{"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf"}) {
c.getFieldString(tbl, "xpath_protobuf_file", &pc.XPathProtobufFile)
c.getFieldString(tbl, "xpath_protobuf_type", &pc.XPathProtobufType)
c.getFieldStringSlice(tbl, "xpath_protobuf_import_paths", &pc.XPathProtobufImportPaths)
c.getFieldBool(tbl, "xpath_print_document", &pc.XPathPrintDocument)
// Determine the actual xpath configuration tables
node, xpathOK := tbl.Fields["xpath"]
if !xpathOK {
// Add this for backward compatibility
node, xpathOK = tbl.Fields[pc.DataFormat]
}
if xpathOK {
if subtbls, ok := node.([]*ast.Table); ok {
pc.XPathConfig = make([]parsers.XPathConfig, len(subtbls))
for i, subtbl := range subtbls {
subcfg := pc.XPathConfig[i]
c.getFieldString(subtbl, "metric_name", &subcfg.MetricQuery)
c.getFieldString(subtbl, "metric_selection", &subcfg.Selection)
c.getFieldString(subtbl, "timestamp", &subcfg.Timestamp)
c.getFieldString(subtbl, "timestamp_format", &subcfg.TimestampFmt)
c.getFieldStringMap(subtbl, "tags", &subcfg.Tags)
c.getFieldStringMap(subtbl, "fields", &subcfg.Fields)
c.getFieldStringMap(subtbl, "fields_int", &subcfg.FieldsInt)
c.getFieldString(subtbl, "field_selection", &subcfg.FieldSelection)
c.getFieldBool(subtbl, "field_name_expansion", &subcfg.FieldNameExpand)
c.getFieldString(subtbl, "field_name", &subcfg.FieldNameQuery)
c.getFieldString(subtbl, "field_value", &subcfg.FieldValueQuery)
c.getFieldString(subtbl, "tag_selection", &subcfg.TagSelection)
c.getFieldBool(subtbl, "tag_name_expansion", &subcfg.TagNameExpand)
c.getFieldString(subtbl, "tag_name", &subcfg.TagNameQuery)
c.getFieldString(subtbl, "tag_value", &subcfg.TagValueQuery)
pc.XPathConfig[i] = subcfg
}
}
}
}
//for JSONPath parser
if node, ok := tbl.Fields["json_v2"]; ok {
if metricConfigs, ok := node.([]*ast.Table); ok {
pc.JSONV2Config = make([]parsers.JSONV2Config, len(metricConfigs))
for i, metricConfig := range metricConfigs {
mc := pc.JSONV2Config[i]
c.getFieldString(metricConfig, "measurement_name", &mc.MeasurementName)
if mc.MeasurementName == "" {
mc.MeasurementName = name
}
c.getFieldString(metricConfig, "measurement_name_path", &mc.MeasurementNamePath)
c.getFieldString(metricConfig, "timestamp_path", &mc.TimestampPath)
c.getFieldString(metricConfig, "timestamp_format", &mc.TimestampFormat)
c.getFieldString(metricConfig, "timestamp_timezone", &mc.TimestampTimezone)
mc.Fields = getFieldSubtable(c, metricConfig)
mc.Tags = getTagSubtable(c, metricConfig)
if objectconfigs, ok := metricConfig.Fields["object"]; ok {
if objectconfigs, ok := objectconfigs.([]*ast.Table); ok {
for _, objectConfig := range objectconfigs {
var o json_v2.JSONObject
c.getFieldString(objectConfig, "path", &o.Path)
c.getFieldBool(objectConfig, "optional", &o.Optional)
c.getFieldString(objectConfig, "timestamp_key", &o.TimestampKey)
c.getFieldString(objectConfig, "timestamp_format", &o.TimestampFormat)
c.getFieldString(objectConfig, "timestamp_timezone", &o.TimestampTimezone)
c.getFieldBool(objectConfig, "disable_prepend_keys", &o.DisablePrependKeys)
c.getFieldStringSlice(objectConfig, "included_keys", &o.IncludedKeys)
c.getFieldStringSlice(objectConfig, "excluded_keys", &o.ExcludedKeys)
c.getFieldStringSlice(objectConfig, "tags", &o.Tags)
c.getFieldStringMap(objectConfig, "renames", &o.Renames)
c.getFieldStringMap(objectConfig, "fields", &o.Fields)
o.FieldPaths = getFieldSubtable(c, objectConfig)
o.TagPaths = getTagSubtable(c, objectConfig)
mc.JSONObjects = append(mc.JSONObjects, o)
}
}
}
pc.JSONV2Config[i] = mc
}
}
}
pc.MetricName = name
if c.hasErrs() {
return nil, c.firstErr()
}
return pc, nil
}
func getFieldSubtable(c *Config, metricConfig *ast.Table) []json_v2.DataSet {
var fields []json_v2.DataSet
if fieldConfigs, ok := metricConfig.Fields["field"]; ok {
if fieldConfigs, ok := fieldConfigs.([]*ast.Table); ok {
for _, fieldconfig := range fieldConfigs {
var f json_v2.DataSet
c.getFieldString(fieldconfig, "path", &f.Path)
c.getFieldString(fieldconfig, "rename", &f.Rename)
c.getFieldString(fieldconfig, "type", &f.Type)
c.getFieldBool(fieldconfig, "optional", &f.Optional)
fields = append(fields, f)
}
}
}
return fields
}
func getTagSubtable(c *Config, metricConfig *ast.Table) []json_v2.DataSet {
var tags []json_v2.DataSet
if fieldConfigs, ok := metricConfig.Fields["tag"]; ok {
if fieldConfigs, ok := fieldConfigs.([]*ast.Table); ok {
for _, fieldconfig := range fieldConfigs {
var t json_v2.DataSet
c.getFieldString(fieldconfig, "path", &t.Path)
c.getFieldString(fieldconfig, "rename", &t.Rename)
t.Type = "string"
tags = append(tags, t)
c.getFieldBool(fieldconfig, "optional", &t.Optional)
}
}
}
return tags
}
// buildSerializer grabs the necessary entries from the ast.Table for creating
// a serializers.Serializer object, and creates it, which can then be added onto
// an Output object.
func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) {
sc := &serializers.Config{TimestampUnits: 1 * time.Second}
c.getFieldString(tbl, "data_format", &sc.DataFormat)
if sc.DataFormat == "" {
sc.DataFormat = "influx"
}
c.getFieldString(tbl, "prefix", &sc.Prefix)
c.getFieldString(tbl, "template", &sc.Template)
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format)
c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar)
c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes)
c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport)
c.getFieldBool(tbl, "graphite_tag_support", &sc.GraphiteTagSupport)
c.getFieldString(tbl, "graphite_tag_sanitize_mode", &sc.GraphiteTagSanitizeMode)
c.getFieldString(tbl, "graphite_separator", &sc.GraphiteSeparator)
c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)
c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting)
c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric)
c.getFieldStringSlice(tbl, "wavefront_source_override", &sc.WavefrontSourceOverride)
c.getFieldBool(tbl, "wavefront_use_strict", &sc.WavefrontUseStrict)
c.getFieldBool(tbl, "wavefront_disable_prefix_conversion", &sc.WavefrontDisablePrefixConversion)
c.getFieldBool(tbl, "prometheus_export_timestamp", &sc.PrometheusExportTimestamp)
c.getFieldBool(tbl, "prometheus_sort_metrics", &sc.PrometheusSortMetrics)
c.getFieldBool(tbl, "prometheus_string_as_label", &sc.PrometheusStringAsLabel)
if c.hasErrs() {
return nil, c.firstErr()
}
return serializers.NewSerializer(sc)
}
// buildOutput parses output specific items from the ast.Table,
// builds the filter and returns an
// models.OutputConfig to be inserted into models.RunningInput
// Note: error exists in the return for future calls that might require error
func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
filter, err := c.buildFilter(tbl)
if err != nil {
return nil, err
}
oc := &models.OutputConfig{
Name: name,
Filter: filter,
}
// TODO: support FieldPass/FieldDrop on outputs
c.getFieldDuration(tbl, "flush_interval", &oc.FlushInterval)
c.getFieldDuration(tbl, "flush_jitter", &oc.FlushJitter)
c.getFieldInt(tbl, "metric_buffer_limit", &oc.MetricBufferLimit)
c.getFieldInt(tbl, "metric_batch_size", &oc.MetricBatchSize)
c.getFieldString(tbl, "alias", &oc.Alias)
c.getFieldString(tbl, "name_override", &oc.NameOverride)
c.getFieldString(tbl, "name_suffix", &oc.NameSuffix)
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
if c.hasErrs() {
return nil, c.firstErr()
}
return oc, nil
}
func (c *Config) missingTomlField(_ reflect.Type, key string) error {
switch key {
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"collection_offset",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
"dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path",
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",
"grace", "graphite_separator", "graphite_tag_sanitize_mode", "graphite_tag_support",
"grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns",
"grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_parser_type", "influx_sort_fields",
"influx_uint_support", "interval", "json_name_key", "json_query", "json_strict",
"json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2",
"lvm", "metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix",
"name_suffix", "namedrop", "namepass", "order", "pass", "period", "precision",
"prefix", "prometheus_export_timestamp", "prometheus_ignore_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
"separator", "splunkmetric_hec_routing", "splunkmetric_multimetric", "tag_keys",
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "template", "templates",
"value_field_name", "wavefront_source_override", "wavefront_use_strict", "wavefront_disable_prefix_conversion",
"xml", "xpath", "xpath_json", "xpath_msgpack", "xpath_protobuf", "xpath_print_document",
"xpath_protobuf_file", "xpath_protobuf_type", "xpath_protobuf_import_paths":
// ignore fields that are common to all plugins.
default:
c.UnusedFields[key] = true
}
return nil
}
func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) {
f := func(_ reflect.Type, key string) error {
if c, ok := counter[key]; ok {
counter[key] = c + 1
} else {
counter[key] = 1
}
return nil
}
c.toml.MissingField = f
}
func (c *Config) resetMissingTomlFieldTracker() {
c.toml.MissingField = c.missingTomlField
}
func (c *Config) getFieldString(tbl *ast.Table, fieldName string, target *string) {
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
*target = str.Value
}
}
}
}
func (c *Config) getFieldDuration(tbl *ast.Table, fieldName string, target interface{}) {
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
d, err := time.ParseDuration(str.Value)
if err != nil {
c.addError(tbl, fmt.Errorf("error parsing duration: %w", err))
return
}
targetVal := reflect.ValueOf(target).Elem()
targetVal.Set(reflect.ValueOf(d))
}
}
}
}
func (c *Config) getFieldBool(tbl *ast.Table, fieldName string, target *bool) {
var err error
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
switch t := kv.Value.(type) {
case *ast.Boolean:
*target, err = t.Boolean()
if err != nil {
c.addError(tbl, fmt.Errorf("unknown boolean value type %q, expecting boolean", kv.Value))
return
}
case *ast.String:
*target, err = strconv.ParseBool(t.Value)
if err != nil {
c.addError(tbl, fmt.Errorf("unknown boolean value type %q, expecting boolean", kv.Value))
return
}
default:
c.addError(tbl, fmt.Errorf("unknown boolean value type %q, expecting boolean", kv.Value.Source()))
return
}
}
}
}
func (c *Config) getFieldInt(tbl *ast.Table, fieldName string, target *int) {
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if iAst, ok := kv.Value.(*ast.Integer); ok {
i, err := iAst.Int()
if err != nil {
c.addError(tbl, fmt.Errorf("unexpected int type %q, expecting int", iAst.Value))
return
}
*target = int(i)
}
}
}
}
func (c *Config) getFieldInt64(tbl *ast.Table, fieldName string, target *int64) {
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if iAst, ok := kv.Value.(*ast.Integer); ok {
i, err := iAst.Int()
if err != nil {
c.addError(tbl, fmt.Errorf("unexpected int type %q, expecting int", iAst.Value))
return
}
*target = i
}
}
}
}
func (c *Config) getFieldStringSlice(tbl *ast.Table, fieldName string, target *[]string) {
if node, ok := tbl.Fields[fieldName]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
ary, ok := kv.Value.(*ast.Array)
if !ok {
c.addError(tbl, fmt.Errorf("found unexpected format while parsing %q, expecting string array/slice format", fieldName))
return
}
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
*target = append(*target, str.Value)
}
}
}
}
}
func (c *Config) getFieldTagFilter(tbl *ast.Table, fieldName string, target *[]models.TagFilter) {
if node, ok := tbl.Fields[fieldName]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
ary, ok := kv.Value.(*ast.Array)
if !ok {
c.addError(tbl, fmt.Errorf("found unexpected format while parsing %q, expecting string array/slice format on each entry", fieldName))
return
}
tagFilter := models.TagFilter{Name: name}
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagFilter.Filter = append(tagFilter.Filter, str.Value)
}
}
*target = append(*target, tagFilter)
}
}
}
}
}
func (c *Config) getFieldStringMap(tbl *ast.Table, fieldName string, target *map[string]string) {
*target = map[string]string{}
if node, ok := tbl.Fields[fieldName]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
(*target)[name] = str.Value
}
}
}
}
}
}
func keys(m map[string]bool) []string {
result := []string{}
for k := range m {
result = append(result, k)
}
return result
}
func (c *Config) hasErrs() bool {
return len(c.errs) > 0
}
func (c *Config) firstErr() error {
if len(c.errs) == 0 {
return nil
}
return c.errs[0]
}
func (c *Config) addError(tbl *ast.Table, err error) {
c.errs = append(c.errs, fmt.Errorf("line %d:%d: %w", tbl.Line, tbl.Position, err))
}
// unwrappable lets you retrieve the original telegraf.Processor from the
// StreamingProcessor. This is necessary because the toml Unmarshaller won't
// look inside composed types.
type unwrappable interface {
Unwrap() telegraf.Processor
}