plugins/parsers/json_v2/parser.go (562 lines of code) (raw):
package json_v2
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/tidwall/gjson"
)
// Parser adheres to the parser interface, contains the parser configuration, and data required to parse JSON
type Parser struct {
// These struct fields are common for a parser
Configs []Config
DefaultTags map[string]string
Log telegraf.Logger
// **** The struct fields bellow this comment are used for processing indvidual configs ****
// measurementName is the the name of the current config used in each line protocol
measurementName string
// **** Specific for object configuration ****
// subPathResults contains the results of sub-gjson path expressions provided in fields/tags table within object config
subPathResults []PathResult
// iterateObjects dictates if ExpandArray function will handle objects
iterateObjects bool
// objectConfig contains the config for an object, some info is needed while iterating over the gjson results
objectConfig JSONObject
}
type PathResult struct {
result gjson.Result
tag bool
DataSet
}
type Config struct {
MeasurementName string `toml:"measurement_name"` // OPTIONAL
MeasurementNamePath string `toml:"measurement_name_path"` // OPTIONAL
TimestampPath string `toml:"timestamp_path"` // OPTIONAL
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
Fields []DataSet
Tags []DataSet
JSONObjects []JSONObject
}
type DataSet struct {
Path string `toml:"path"` // REQUIRED
Type string `toml:"type"` // OPTIONAL, can't be set for tags they will always be a string
Rename string `toml:"rename"`
Optional bool `toml:"optional"` // Will suppress errors if there isn't a match with Path
}
type JSONObject struct {
Path string `toml:"path"` // REQUIRED
Optional bool `toml:"optional"` // Will suppress errors if there isn't a match with Path
TimestampKey string `toml:"timestamp_key"`
TimestampFormat string `toml:"timestamp_format"` // OPTIONAL, but REQUIRED when timestamp_path is defined
TimestampTimezone string `toml:"timestamp_timezone"` // OPTIONAL, but REQUIRES timestamp_path
Renames map[string]string `toml:"renames"`
Fields map[string]string `toml:"fields"`
Tags []string `toml:"tags"`
IncludedKeys []string `toml:"included_keys"`
ExcludedKeys []string `toml:"excluded_keys"`
DisablePrependKeys bool `toml:"disable_prepend_keys"`
FieldPaths []DataSet
TagPaths []DataSet
}
type MetricNode struct {
ParentIndex int
OutputName string
SetName string
Tag bool
DesiredType string // Can be "int", "uint", "float", "bool", "string"
/*
IncludeCollection is only used when processing objects and is responsible for containing the gjson results
found by the gjson paths provided in the FieldPaths and TagPaths configs.
*/
IncludeCollection *PathResult
Metric telegraf.Metric
gjson.Result
}
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
// Only valid JSON is supported
if !gjson.Valid(string(input)) {
return nil, fmt.Errorf("invalid JSON provided, unable to parse")
}
var metrics []telegraf.Metric
for _, c := range p.Configs {
// Measurement name can either be hardcoded, or parsed from the JSON using a GJSON path expression
p.measurementName = c.MeasurementName
if c.MeasurementNamePath != "" {
result := gjson.GetBytes(input, c.MeasurementNamePath)
if !result.IsArray() && !result.IsObject() {
p.measurementName = result.String()
}
}
// timestamp defaults to current time, or can be parsed from the JSON using a GJSON path expression
timestamp := time.Now()
if c.TimestampPath != "" {
result := gjson.GetBytes(input, c.TimestampPath)
if result.Type == gjson.Null {
p.Log.Debugf("Message: %s", input)
return nil, fmt.Errorf("The timestamp path %s returned NULL", c.TimestampPath)
}
if !result.IsArray() && !result.IsObject() {
if c.TimestampFormat == "" {
err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
return nil, err
}
var err error
timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone)
if err != nil {
return nil, err
}
}
}
fields, err := p.processMetric(input, c.Fields, false, timestamp)
if err != nil {
return nil, err
}
tags, err := p.processMetric(input, c.Tags, true, timestamp)
if err != nil {
return nil, err
}
objects, err := p.processObjects(input, c.JSONObjects, timestamp)
if err != nil {
return nil, err
}
metrics = append(metrics, cartesianProduct(tags, fields)...)
if len(objects) != 0 && len(metrics) != 0 {
metrics = cartesianProduct(objects, metrics)
} else {
metrics = append(metrics, objects...)
}
}
for k, v := range p.DefaultTags {
for _, t := range metrics {
t.AddTag(k, v)
}
}
return metrics, nil
}
// processMetric will iterate over all 'field' or 'tag' configs and create metrics for each
// A field/tag can either be a single value or an array of values, each resulting in its own metric
// For multiple configs, a set of metrics is created from the cartesian product of each separate config
func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp time.Time) ([]telegraf.Metric, error) {
if len(data) == 0 {
return nil, nil
}
p.iterateObjects = false
var metrics [][]telegraf.Metric
for _, c := range data {
if c.Path == "" {
return nil, fmt.Errorf("GJSON path is required")
}
result := gjson.GetBytes(input, c.Path)
if skip, err := p.checkResult(result, c.Path, c.Optional); err != nil {
if skip {
continue
}
return nil, err
}
if result.IsObject() {
p.Log.Debugf("Found object in the path: %s, ignoring it please use 'object' to gather metrics from objects", c.Path)
continue
}
setName := c.Rename
// Default to the last path word, should be the upper key name
if setName == "" {
s := strings.Split(c.Path, ".")
setName = s[len(s)-1]
}
setName = strings.ReplaceAll(setName, " ", "_")
mNode := MetricNode{
OutputName: setName,
SetName: setName,
DesiredType: c.Type,
Tag: tag,
Metric: metric.New(
p.measurementName,
map[string]string{},
map[string]interface{}{},
timestamp,
),
Result: result,
}
// Expand all array's and nested arrays into separate metrics
nodes, err := p.expandArray(mNode, timestamp)
if err != nil {
return nil, err
}
metrics = append(metrics, nodes)
}
for i := 1; i < len(metrics); i++ {
metrics[i] = cartesianProduct(metrics[i-1], metrics[i])
}
if len(metrics) == 0 {
return nil, nil
}
return metrics[len(metrics)-1], nil
}
func cartesianProduct(a, b []telegraf.Metric) []telegraf.Metric {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
p := make([]telegraf.Metric, len(a)*len(b))
i := 0
for _, a := range a {
for _, b := range b {
m := a.Copy()
mergeMetric(b, m)
p[i] = m
i++
}
}
return p
}
func mergeMetric(a telegraf.Metric, m telegraf.Metric) {
for _, f := range a.FieldList() {
m.AddField(f.Key, f.Value)
}
for _, t := range a.TagList() {
m.AddTag(t.Key, t.Value)
}
}
// expandArray will recursively create a new MetricNode for each element in a JSON array or single value
func (p *Parser) expandArray(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) {
var results []telegraf.Metric
if result.IsObject() {
if !p.iterateObjects {
p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects")
return results, nil
}
r, err := p.combineObject(result, timestamp)
if err != nil {
return nil, err
}
results = append(results, r...)
return results, nil
}
if result.IsArray() {
var err error
if result.IncludeCollection == nil && (len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0) {
result.IncludeCollection = p.existsInpathResults(result.Index)
}
result.ForEach(func(_, val gjson.Result) bool {
m := metric.New(
p.measurementName,
map[string]string{},
map[string]interface{}{},
timestamp,
)
if val.IsObject() {
n := result
n.ParentIndex += val.Index
n.Metric = m
n.Result = val
r, err := p.combineObject(n, timestamp)
if err != nil {
return false
}
results = append(results, r...)
if len(results) != 0 {
for _, newResult := range results {
mergeMetric(result.Metric, newResult)
}
}
return true
}
mergeMetric(result.Metric, m)
n := result
n.ParentIndex += val.Index
n.Metric = m
n.Result = val
r, err := p.expandArray(n, timestamp)
if err != nil {
return false
}
results = append(results, r...)
return true
})
if err != nil {
return nil, err
}
} else {
if p.objectConfig.TimestampKey != "" && result.SetName == p.objectConfig.TimestampKey {
if p.objectConfig.TimestampFormat == "" {
err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
return nil, err
}
timestamp, err := internal.ParseTimestamp(p.objectConfig.TimestampFormat, result.String(), p.objectConfig.TimestampTimezone)
if err != nil {
return nil, err
}
result.Metric.SetTime(timestamp)
} else {
switch result.Value().(type) {
case nil: // Ignore JSON values that are set as null
default:
outputName := result.OutputName
desiredType := result.DesiredType
if len(p.objectConfig.FieldPaths) > 0 || len(p.objectConfig.TagPaths) > 0 {
var pathResult *PathResult
// When IncludeCollection isn't nil, that means the current result is included in the collection.
if result.IncludeCollection != nil {
pathResult = result.IncludeCollection
} else {
// Verify that the result should be included based on the results of fieldpaths and tag paths
pathResult = p.existsInpathResults(result.ParentIndex)
}
if pathResult == nil {
return results, nil
}
if pathResult.tag {
result.Tag = true
}
if !pathResult.tag {
desiredType = pathResult.Type
}
if pathResult.Rename != "" {
outputName = pathResult.Rename
}
}
if result.Tag {
desiredType = "string"
}
v, err := p.convertType(result.Result, desiredType, result.SetName)
if err != nil {
return nil, err
}
if result.Tag {
result.Metric.AddTag(outputName, v.(string))
} else {
result.Metric.AddField(outputName, v)
}
}
}
results = append(results, result.Metric)
}
return results, nil
}
func (p *Parser) existsInpathResults(index int) *PathResult {
for _, f := range p.subPathResults {
if f.result.Index == index {
return &f
}
// Indexes will be populated with all the elements that match on a `#(...)#` query
for _, i := range f.result.Indexes {
if i == index {
return &f
}
}
}
return nil
}
// processObjects will iterate over all 'object' configs and create metrics for each
func (p *Parser) processObjects(input []byte, objects []JSONObject, timestamp time.Time) ([]telegraf.Metric, error) {
p.iterateObjects = true
var t []telegraf.Metric
for _, c := range objects {
p.objectConfig = c
if c.Path == "" {
return nil, fmt.Errorf("GJSON path is required")
}
result := gjson.GetBytes(input, c.Path)
if skip, err := p.checkResult(result, c.Path, c.Optional); err != nil {
if skip {
continue
}
return nil, err
}
scopedJSON := []byte(result.Raw)
for _, f := range c.FieldPaths {
var r PathResult
r.result = gjson.GetBytes(scopedJSON, f.Path)
if skip, err := p.checkResult(r.result, f.Path, f.Optional); err != nil {
if skip {
continue
}
return nil, err
}
r.DataSet = f
p.subPathResults = append(p.subPathResults, r)
}
for _, f := range c.TagPaths {
var r PathResult
r.result = gjson.GetBytes(scopedJSON, f.Path)
if skip, err := p.checkResult(r.result, f.Path, f.Optional); err != nil {
if skip {
continue
}
return nil, err
}
r.DataSet = f
r.tag = true
p.subPathResults = append(p.subPathResults, r)
}
rootObject := MetricNode{
ParentIndex: 0,
Metric: metric.New(
p.measurementName,
map[string]string{},
map[string]interface{}{},
timestamp,
),
Result: result,
}
metrics, err := p.expandArray(rootObject, timestamp)
if err != nil {
return nil, err
}
t = append(t, metrics...)
}
return t, nil
}
// combineObject will add all fields/tags to a single metric
// If the object has multiple array's as elements it won't comine those, they will remain separate metrics
func (p *Parser) combineObject(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) {
var results []telegraf.Metric
if result.IsArray() || result.IsObject() {
var err error
result.ForEach(func(key, val gjson.Result) bool {
// Determine if field/tag set name is configured
var setName string
if result.SetName != "" {
setName = result.SetName + "_" + strings.ReplaceAll(key.String(), " ", "_")
} else {
setName = strings.ReplaceAll(key.String(), " ", "_")
}
if p.isExcluded(setName) || !p.isIncluded(setName, val) {
return true
}
var outputName string
if p.objectConfig.DisablePrependKeys {
outputName = strings.ReplaceAll(key.String(), " ", "_")
} else {
outputName = setName
}
for k, n := range p.objectConfig.Renames {
if k == setName {
outputName = n
break
}
}
arrayNode := result
arrayNode.ParentIndex += val.Index
arrayNode.OutputName = outputName
arrayNode.SetName = setName
arrayNode.Result = val
for k, t := range p.objectConfig.Fields {
if setName == k {
arrayNode.DesiredType = t
break
}
}
tag := false
for _, t := range p.objectConfig.Tags {
if setName == t {
tag = true
break
}
}
arrayNode.Tag = tag
if val.IsObject() {
results, err = p.combineObject(arrayNode, timestamp)
if err != nil {
return false
}
} else {
r, err := p.expandArray(arrayNode, timestamp)
if err != nil {
return false
}
results = cartesianProduct(r, results)
}
return true
})
if err != nil {
return nil, err
}
}
return results, nil
}
func (p *Parser) isIncluded(key string, val gjson.Result) bool {
if len(p.objectConfig.IncludedKeys) == 0 {
return true
}
// automatically adds tags to included_keys so it does NOT have to be repeated in the config
allKeys := append(p.objectConfig.IncludedKeys, p.objectConfig.Tags...)
for _, i := range allKeys {
if i == key {
return true
}
if val.IsArray() || val.IsObject() {
// Check if the included key is a sub element
if strings.HasPrefix(i, key) {
return true
}
}
}
return false
}
func (p *Parser) isExcluded(key string) bool {
for _, i := range p.objectConfig.ExcludedKeys {
if i == key {
return true
}
}
return false
}
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
return nil, fmt.Errorf("ParseLine is designed for parsing influx line protocol, therefore not implemented for parsing JSON")
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
// convertType will convert the value parsed from the input JSON to the specified type in the config
func (p *Parser) convertType(input gjson.Result, desiredType string, name string) (interface{}, error) {
switch inputType := input.Value().(type) {
case string:
switch desiredType {
case "uint":
r, err := strconv.ParseUint(inputType, 10, 64)
if err != nil {
return nil, fmt.Errorf("Unable to convert field '%s' to type uint: %v", name, err)
}
return r, nil
case "int":
r, err := strconv.ParseInt(inputType, 10, 64)
if err != nil {
return nil, fmt.Errorf("Unable to convert field '%s' to type int: %v", name, err)
}
return r, nil
case "float":
r, err := strconv.ParseFloat(inputType, 64)
if err != nil {
return nil, fmt.Errorf("Unable to convert field '%s' to type float: %v", name, err)
}
return r, nil
case "bool":
r, err := strconv.ParseBool(inputType)
if err != nil {
return nil, fmt.Errorf("Unable to convert field '%s' to type bool: %v", name, err)
}
return r, nil
}
case bool:
switch desiredType {
case "string":
return strconv.FormatBool(inputType), nil
case "int":
if inputType {
return int64(1), nil
}
return int64(0), nil
case "uint":
if inputType {
return uint64(1), nil
}
return uint64(0), nil
}
case float64:
switch desiredType {
case "string":
return fmt.Sprint(inputType), nil
case "int":
return input.Int(), nil
case "uint":
return input.Uint(), nil
case "bool":
if inputType == 0 {
return false, nil
} else if inputType == 1 {
return true, nil
} else {
return nil, fmt.Errorf("Unable to convert field '%s' to type bool", name)
}
}
default:
return nil, fmt.Errorf("unknown format '%T' for field '%s'", inputType, name)
}
return input.Value(), nil
}
func (p *Parser) checkResult(result gjson.Result, path string, optional bool) (bool, error) {
if !result.Exists() {
if optional {
// If path is marked as optional don't error if path doesn't return a result
p.Log.Debugf("the path %s doesn't exist", path)
return true, nil
}
return false, fmt.Errorf("the path %s doesn't exist", path)
}
return false, nil
}