libbeat/processors/actions/decode_json_fields.go (213 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package actions
import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common/jsontransform"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor/registry"
cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
type decodeJSONFields struct {
fields []string
maxDepth int
expandKeys bool
overwriteKeys bool
addErrorKey bool
processArray bool
documentID string
target *string
logger *logp.Logger
}
type config struct {
Fields []string `config:"fields"`
MaxDepth int `config:"max_depth" validate:"min=1"`
ExpandKeys bool `config:"expand_keys"`
OverwriteKeys bool `config:"overwrite_keys"`
AddErrorKey bool `config:"add_error_key"`
ProcessArray bool `config:"process_array"`
Target *string `config:"target"`
DocumentID string `config:"document_id"`
}
var (
defaultConfig = config{
MaxDepth: 1,
ProcessArray: false,
}
errProcessingSkipped = errors.New("processing skipped")
)
func init() {
processors.RegisterPlugin("decode_json_fields",
checks.ConfigChecked(NewDecodeJSONFields,
checks.RequireFields("fields"),
checks.AllowedFields("fields", "max_depth", "overwrite_keys", "add_error_key", "process_array", "target", "when", "document_id", "expand_keys")))
jsprocessor.RegisterPlugin("DecodeJSONFields", NewDecodeJSONFields)
}
// NewDecodeJSONFields construct a new decode_json_fields processor.
func NewDecodeJSONFields(c *cfg.C) (beat.Processor, error) {
config := defaultConfig
logger := logp.NewLogger("truncate_fields")
err := c.Unpack(&config)
if err != nil {
logger.Warn("Error unpacking config for decode_json_fields")
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %w", err)
}
f := &decodeJSONFields{
fields: config.Fields,
maxDepth: config.MaxDepth,
expandKeys: config.ExpandKeys,
overwriteKeys: config.OverwriteKeys,
addErrorKey: config.AddErrorKey,
processArray: config.ProcessArray,
documentID: config.DocumentID,
target: config.Target,
logger: logger,
}
return f, nil
}
func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
var errs []string
for _, field := range f.fields {
data, err := event.GetValue(field)
if err != nil && !errors.Is(err, mapstr.ErrKeyNotFound) {
f.logger.Debugf("Error trying to GetValue for field : %s in event : %v", field, event)
errs = append(errs, err.Error())
continue
}
text, ok := data.(string)
if !ok {
// ignore non string fields when unmarshaling
continue
}
var output interface{}
err = unmarshal(f.maxDepth, text, &output, f.processArray)
if err != nil {
f.logger.Debugf("Error trying to unmarshal %s", text)
errs = append(errs, err.Error())
event.SetErrorWithOption(fmt.Sprintf("parsing input as JSON: %s", err.Error()), f.addErrorKey, text, field)
continue
}
target := field
if f.target != nil {
target = *f.target
}
var id string
if key := f.documentID; key != "" {
if dict, ok := output.(map[string]interface{}); ok {
if tmp, err := mapstr.M(dict).GetValue(key); err == nil {
if v, ok := tmp.(string); ok {
id = v
_ = mapstr.M(dict).Delete(key)
}
}
}
}
if target != "" {
if f.expandKeys {
switch t := output.(type) {
case map[string]interface{}:
jsontransform.ExpandFields(f.logger, event, t, f.addErrorKey)
default:
errs = append(errs, "failed to expand keys")
}
}
_, err = event.PutValue(target, output)
} else {
switch t := output.(type) {
case map[string]interface{}:
jsontransform.WriteJSONKeys(event, t, f.expandKeys, f.overwriteKeys, f.addErrorKey)
default:
errs = append(errs, "failed to add target to root")
}
}
if err != nil {
f.logger.Debugf("Error trying to Put value %v for field : %s", output, field)
errs = append(errs, err.Error())
continue
}
if id != "" {
if event.Meta == nil {
event.Meta = mapstr.M{}
}
event.Meta[events.FieldMetaID] = id
}
}
if len(errs) > 0 {
return event, errors.New(strings.Join(errs, ", "))
}
return event, nil
}
func unmarshal(maxDepth int, text string, fields *interface{}, processArray bool) error {
if err := decodeJSON(text, fields); err != nil {
return err
}
maxDepth--
if maxDepth == 0 {
return nil
}
tryUnmarshal := func(v interface{}) (interface{}, bool) {
str, isString := v.(string)
if !isString {
return v, false
} else if !isStructured(str) {
return str, false
}
var tmp interface{}
err := unmarshal(maxDepth, str, &tmp, processArray)
if err != nil {
return v, errors.Is(err, errProcessingSkipped)
}
return tmp, true
}
// try to deep unmarshal fields
switch O := (*fields).(type) {
case map[string]interface{}:
for k, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
O[k] = decoded
}
}
// We want to process arrays here
case []interface{}:
if !processArray {
return errProcessingSkipped
}
for i, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
O[i] = decoded
}
}
}
return nil
}
func decodeJSON(text string, to *interface{}) error {
dec := json.NewDecoder(strings.NewReader(text))
dec.UseNumber()
err := dec.Decode(to)
if err != nil {
return err
}
if dec.More() {
return errors.New("multiple json elements found")
}
if _, err := dec.Token(); err != nil && !errors.Is(err, io.EOF) {
return err
}
switch O := (*to).(type) {
case map[string]interface{}:
jsontransform.TransformNumbers(O)
}
return nil
}
func (f decodeJSONFields) String() string {
return "decode_json_fields=" + strings.Join(f.fields, ", ")
}
func isStructured(s string) bool {
s = strings.TrimSpace(s)
end := len(s) - 1
return end > 0 && ((s[0] == '[' && s[end] == ']') ||
(s[0] == '{' && s[end] == '}'))
}