plugins/processor/anchor/anchor.go (181 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package anchor import ( "strings" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/selfmonitor" "github.com/alibaba/ilogtail/pkg/util" "github.com/buger/jsonparser" ) const ( StringType = 1 JSONType = 2 StringtypeStr = "string" JsontypeStr = "json" ) // Anchor is used to locate a substring with Start and Stop specified. // The substring will be inserted into log with FieldName and FieldType. // ExpondJSON indicates if the substring is JSON type, if this flag is // set, the substring will be expanded to multiple key/value pairs, and // values of pairs will be inserted into log with key prepended FieldName // and ExpondConnector, such as fn_key1, fn_key2. // ExpondJSON is used to specify depth to expand, 0 means no limit. // // Note: Expond is a typo, but because this feature has already published, // keep this typo for compatibility. type Anchor struct { Start string Stop string FieldName string FieldType string ExpondJSON bool IgnoreJSONError bool ExpondConnecter string MaxExpondDepth int innerType int } // ProcessorAnchor is a processor plugin to process field with anchors. // Field specified by SourceKey will be processed by all Anchors. // If no SourceKey is specified, the first field in log contents will be processed. type ProcessorAnchor struct { Anchors []Anchor NoKeyError bool NoAnchorError bool SourceKey string KeepSource bool context pipeline.Context logPairMetric selfmonitor.CounterMetric } // Init called for init some system resources, like socket, mutex... func (p *ProcessorAnchor) Init(context pipeline.Context) error { p.context = context for i := range p.Anchors { switch p.Anchors[i].FieldType { case StringtypeStr: p.Anchors[i].innerType = StringType case JsontypeStr: p.Anchors[i].innerType = JSONType if len(p.Anchors[i].ExpondConnecter) == 0 { p.Anchors[i].ExpondConnecter = "_" } // if max expond depth is 1, this is no expond if p.Anchors[i].MaxExpondDepth == 1 { p.Anchors[i].ExpondJSON = false } else if p.Anchors[i].MaxExpondDepth == 0 { p.Anchors[i].MaxExpondDepth = 100 } default: p.Anchors[i].innerType = StringType } } metricsRecord := p.context.GetMetricRecord() p.logPairMetric = selfmonitor.NewAverageMetricAndRegister(metricsRecord, selfmonitor.PluginPairsPerLogTotal) return nil } func (*ProcessorAnchor) Description() string { return "anchor processor for logtail" } func (p *ProcessorAnchor) ProcessLogs(logArray []*protocol.Log) []*protocol.Log { for _, log := range logArray { p.ProcessLog(log) } return logArray } func (p *ProcessorAnchor) ProcessLog(log *protocol.Log) { beginLen := len(log.Contents) findKey := false for i, cont := range log.Contents { if len(p.SourceKey) == 0 || p.SourceKey == cont.Key { findKey = true if !p.KeepSource { log.Contents = append(log.Contents[:i], log.Contents[i+1:]...) } p.ProcessAnchor(log, &cont.Value) break } } if !findKey && p.NoKeyError { logger.Warning(p.context.GetRuntimeContext(), "ANCHOR_FIND_ALARM", "anchor cannot find key", p.SourceKey) } p.logPairMetric.Add(int64(len(log.Contents) - beginLen + 1)) } type ExpondParam struct { log *protocol.Log preKey string nowDepth int maxDepth int connector string } func (p *ExpondParam) ExpondJSONCallBack(key []byte, value []byte, dataType jsonparser.ValueType, offset int) error { p.nowDepth++ if p.nowDepth == p.maxDepth || dataType != jsonparser.Object { if dataType == jsonparser.String { // unescape string if strValue, err := jsonparser.ParseString(value); err == nil { p.log.Contents = append(p.log.Contents, &protocol.Log_Content{ Key: p.preKey + p.connector + (string)(key), Value: strValue, }) } else { p.log.Contents = append(p.log.Contents, &protocol.Log_Content{ Key: p.preKey + p.connector + (string)(key), Value: (string)(value), }) } } else { p.log.Contents = append(p.log.Contents, &protocol.Log_Content{ Key: p.preKey + p.connector + (string)(key), Value: (string)(value), }) } } else { backKey := p.preKey p.preKey = p.preKey + p.connector + (string)(key) _ = jsonparser.ObjectEach(value, p.ExpondJSONCallBack) p.preKey = backKey } p.nowDepth-- return nil } func (p *ProcessorAnchor) ProcessAnchor(log *protocol.Log, val *string) { for _, anchor := range p.Anchors { // Start is "", startIndex is 0 startIndex := strings.Index(*val, anchor.Start) if startIndex < 0 { if p.NoAnchorError { logger.Warning(p.context.GetRuntimeContext(), "ANCHOR_FIND_ALARM", "anchor no start", anchor.Start, "content", util.CutString(*val, 1024)) } continue } startIndex += len(anchor.Start) stopIndex := strings.Index((*val)[startIndex:], anchor.Stop) if stopIndex < 0 { if p.NoAnchorError { logger.Warning(p.context.GetRuntimeContext(), "ANCHOR_FIND_ALARM", "anchor no stop", anchor.Stop, "content", util.CutString(*val, 1024)) } continue } else { if len(anchor.Stop) == 0 { stopIndex = len(*val) } else { stopIndex += startIndex } } switch anchor.innerType { case StringType: log.Contents = append(log.Contents, &protocol.Log_Content{Key: anchor.FieldName, Value: (*val)[startIndex:stopIndex]}) case JSONType: var err error if anchor.ExpondJSON { param := ExpondParam{ log: log, preKey: anchor.FieldName, nowDepth: 0, maxDepth: anchor.MaxExpondDepth, connector: anchor.ExpondConnecter, } err = jsonparser.ObjectEach(([]byte)((*val)[startIndex:stopIndex]), param.ExpondJSONCallBack) } else { err = jsonparser.ObjectEach(([]byte)((*val)[startIndex:stopIndex]), func(key []byte, value []byte, dataType jsonparser.ValueType, offset int) error { log.Contents = append(log.Contents, &protocol.Log_Content{ Key: anchor.FieldName + anchor.ExpondConnecter + (string)(key), Value: (string)(value), }) return nil }) } if err != nil && !anchor.IgnoreJSONError { logger.Warning(p.context.GetRuntimeContext(), "ANCHOR_JSON_ALARM", "process json error", err, "content", (*val)[startIndex:stopIndex]) } } } } func init() { pipeline.Processors["processor_anchor"] = func() pipeline.Processor { return &ProcessorAnchor{} } }