plugins/processor/json/processor_json.go (241 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package json import ( "fmt" "github.com/buger/jsonparser" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/util" ) type ProcessorJSON struct { SourceKey string NoKeyError bool ExpandDepth int // 0是不限制,1是当前层 ExpandConnector string // 展开时的连接符,可以为空,默认为_ Prefix string // 默认为空,json解析出Key附加的前缀 KeepSource bool // 是否保留源字段 KeepSourceIfParseError bool UseSourceKeyAsPrefix bool // Should SourceKey be used as prefix for all extracted keys. IgnoreFirstConnector bool // 是否忽略第一个Connector ExpandArray bool // 是否展开数组类型 context pipeline.Context } const pluginType = "processor_json" // Init called for init some system resources, like socket, mutex... func (p *ProcessorJSON) Init(context pipeline.Context) error { if p.SourceKey == "" { return fmt.Errorf("must specify SourceKey for plugin %v", pluginType) } p.context = context return nil } func (*ProcessorJSON) Description() string { return "json processor for logtail" } func (p *ProcessorJSON) ProcessLogs(logArray []*protocol.Log) []*protocol.Log { for _, log := range logArray { p.processLog(log) } return logArray } func (p *ProcessorJSON) processLog(log *protocol.Log) { findKey := false for idx := range log.Contents { if log.Contents[idx].Key == p.SourceKey { objectVal := log.Contents[idx].Value param := ExpandParam{ sourceKey: p.SourceKey, log: log, nowDepth: 0, maxDepth: p.ExpandDepth, connector: p.ExpandConnector, prefix: p.Prefix, ignoreFirstConnector: p.IgnoreFirstConnector, expandArray: p.ExpandArray, } if p.UseSourceKeyAsPrefix { param.preKey = p.SourceKey } err := jsonparser.ObjectEach([]byte(objectVal), param.ExpandJSONCallBack) if err != nil { logger.Errorf(p.context.GetRuntimeContext(), "PROCESSOR_JSON_PARSER_ALARM", "parser json error %v", err) } if !p.shouldKeepSource(err) { log.Contents = append(log.Contents[:idx], log.Contents[idx+1:]...) } findKey = true break } } if !findKey && p.NoKeyError { logger.Warningf(p.context.GetRuntimeContext(), "PROCESSOR_JSON_FIND_ALARM", "cannot find key %v", p.SourceKey) } } func (p *ProcessorJSON) shouldKeepSource(err error) bool { return p.KeepSource || (p.KeepSourceIfParseError && err != nil) } func init() { pipeline.Processors[pluginType] = func() pipeline.Processor { return &ProcessorJSON{ SourceKey: "", NoKeyError: true, ExpandDepth: 0, ExpandConnector: "_", Prefix: "", KeepSource: true, KeepSourceIfParseError: true, UseSourceKeyAsPrefix: false, ExpandArray: false, } } } type ExpandParam struct { sourceKey string log *protocol.Log contents models.LogContents preKey string nowDepth int maxDepth int connector string prefix string ignoreFirstConnector bool isSourceKeyOverwritten bool expandArray bool } func (p *ExpandParam) getConnector(depth int) string { if depth == 1 && p.ignoreFirstConnector { return "" } return p.connector } func (p *ExpandParam) ExpandJSONCallBack(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error { p.nowDepth++ switch dataType { case jsonparser.Object: p.flattenObject(key, value) case jsonparser.Array: p.flattenArray(key, value) default: p.flattenValue(key, value) } p.nowDepth-- return nil } func (p *ExpandParam) flattenObject(key []byte, value []byte) { if p.nowDepth == p.maxDepth { // If reach max depth, add it directly to the result newKey := p.preKey + p.getConnector(p.nowDepth) + (string)(key) p.appendNewContent(newKey, (string)(value)) return } backKey := p.preKey p.preKey = p.preKey + p.getConnector(p.nowDepth) + (string)(key) _ = jsonparser.ObjectEach(value, p.ExpandJSONCallBack) p.preKey = backKey } func (p *ExpandParam) flattenArray(key []byte, value []byte) { defaultKey := p.preKey + p.getConnector(p.nowDepth) + (string)(key) if !p.expandArray || p.nowDepth == p.maxDepth { // If get value error, or not expand array, or reach max depth, add it directly to the result p.appendNewContent(defaultKey, (string)(value)) return } index := 0 _, _ = jsonparser.ArrayEach(value, func(val []byte, dataType jsonparser.ValueType, offset int, err error) { newKey := make([]byte, len(key), len(key)+10) copy(newKey, key) newKey = append(newKey, fmt.Sprintf("[%d]", index)...) if dataType == jsonparser.Object { p.flattenObject(newKey, val) } else { p.flattenValue(newKey, val) } index++ }) } func (p *ExpandParam) flattenValue(key []byte, value []byte) { // If the current value is not a JSON object, nor a JSON array, add it directly to the result newKey := p.preKey + p.getConnector(p.nowDepth) + (string)(key) if strValue, err := jsonparser.ParseString(value); err == nil { p.appendNewContent(newKey, strValue) } else { p.appendNewContent(newKey, (string)(value)) } } func (p *ExpandParam) appendNewContent(key string, value string) { if p.log != nil { p.appendNewContentV1(key, value) } else { p.appendNewContentV2(key, value) } } func (p *ExpandParam) appendNewContentV1(key string, value string) { if len(p.prefix) > 0 { newContent := &protocol.Log_Content{ Key: p.prefix + key, Value: value, } p.log.Contents = append(p.log.Contents, newContent) } else { newContent := &protocol.Log_Content{ Key: key, Value: value, } p.log.Contents = append(p.log.Contents, newContent) } } func (p *ExpandParam) appendNewContentV2(key string, value string) { if len(p.prefix) > 0 { key = p.prefix + key } p.contents.Add(key, value) if key == p.sourceKey { p.isSourceKeyOverwritten = true } } func (p *ProcessorJSON) Process(in *models.PipelineGroupEvents, context pipeline.PipelineContext) { for _, event := range in.Events { p.processEvent(event) } context.Collector().Collect(in.Group, in.Events...) } func (p *ProcessorJSON) processEvent(event models.PipelineEvent) { if event.GetType() != models.EventTypeLogging { return } contents := event.(*models.Log).GetIndices() if !contents.Contains(p.SourceKey) { if p.NoKeyError { logger.Warningf(p.context.GetRuntimeContext(), "PROCESSOR_JSON_FIND_ALARM", "cannot find key %v", p.SourceKey) } return } objectVal := contents.Get(p.SourceKey) bytesVal, ok := objectVal.([]byte) if !ok { var stringVal string stringVal, ok = objectVal.(string) bytesVal = util.ZeroCopyStringToBytes(stringVal) } if !ok { logger.Warningf(p.context.GetRuntimeContext(), "PROCESSOR_JSON_FIND_ALARM", "key %v is not string", p.SourceKey) return } param := ExpandParam{ sourceKey: p.SourceKey, contents: contents, nowDepth: 0, maxDepth: p.ExpandDepth, connector: p.ExpandConnector, prefix: p.Prefix, ignoreFirstConnector: p.IgnoreFirstConnector, expandArray: p.ExpandArray, } if p.UseSourceKeyAsPrefix { param.preKey = p.SourceKey } err := jsonparser.ObjectEach(bytesVal, param.ExpandJSONCallBack) if err != nil { logger.Errorf(p.context.GetRuntimeContext(), "PROCESSOR_JSON_PARSER_ALARM", "parser json error %v", err) } if !p.shouldKeepSource(err) && !param.isSourceKeyOverwritten { contents.Delete(p.SourceKey) } }