plugins/processor/regex/regex.go (104 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package regex import ( "errors" "regexp" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/selfmonitor" "github.com/alibaba/ilogtail/pkg/util" ) // ProcessorRegex is a processor plugin to process field with regex. // It uses Regex to parse the field specified by SourceKey, and insert results with Keys. // If no SourceKey is specified, the first field in log contents will be parsed. // Note: use `()` to encase values to extract in Regex. type ProcessorRegex struct { Regex string Keys []string FullMatch bool NoKeyError bool NoMatchError bool KeepSource bool KeepSourceIfParseError bool SourceKey string context pipeline.Context logPairMetric selfmonitor.CounterMetric re *regexp.Regexp } var errNoRegexKey = errors.New("no regex key error") // Init called for init some system resources, like socket, mutex... func (p *ProcessorRegex) Init(context pipeline.Context) error { p.context = context if len(p.Keys) == 0 { return errNoRegexKey } var err error // `(?s)` change the meaning of `.` in Golang to match the every character, and the default meaning is not match a newline. p.re, err = regexp.Compile("(?s)" + p.Regex) if err != nil { logger.Error(p.context.GetRuntimeContext(), "PROCESSOR_INIT_ALARM", "init regex error", err, "regex", p.Regex) return err } metricsRecord := p.context.GetMetricRecord() p.logPairMetric = selfmonitor.NewAverageMetricAndRegister(metricsRecord, selfmonitor.PluginPairsPerLogTotal) return nil } func (*ProcessorRegex) Description() string { return "regex processor for logtail" } func (p *ProcessorRegex) ProcessLogs(logArray []*protocol.Log) []*protocol.Log { if p.re == nil { return logArray } for _, log := range logArray { p.ProcessLog(log) } return logArray } func (p *ProcessorRegex) ProcessLog(log *protocol.Log) { beginLen := len(log.Contents) findKey := false for i, cont := range log.Contents { if len(p.SourceKey) == 0 || p.SourceKey == cont.Key { findKey = true parseResult := p.processRegex(log, &cont.Value) if !p.shouldKeepSource(parseResult) { log.Contents = append(log.Contents[:i], log.Contents[i+1:]...) } break } } if !findKey && p.NoKeyError { logger.Warning(p.context.GetRuntimeContext(), "REGEX_FIND_ALARM", "anchor cannot find key", p.SourceKey) } p.logPairMetric.Add(int64(len(log.Contents) - beginLen + 1)) } func (p *ProcessorRegex) shouldKeepSource(parseResult bool) bool { return p.KeepSource || (p.KeepSourceIfParseError && !parseResult) } func (p *ProcessorRegex) processRegex(log *protocol.Log, val *string) bool { indexArray := p.re.FindStringSubmatchIndex(*val) if len(indexArray) < 2 || (p.FullMatch && (indexArray[0] != 0 || indexArray[1] != len(*val))) { if p.NoMatchError { logger.Warning(p.context.GetRuntimeContext(), "REGEX_UNMATCHED_ALARM", "unmatch this log content", util.CutString(*val, 512)) } return false } // Use bitwise operations to ignore first two values in indexArray. if (len(indexArray)>>1 - 1) < len(p.Keys) { if p.NoMatchError { logger.Warning(p.context.GetRuntimeContext(), "REGEX_UNMATCHED_ALARM", "match result count less than key count, result count", len(indexArray)>>1-1, "key count", len(p.Keys)) } return false } for i := 0; i < len(p.Keys); i++ { leftIndex := indexArray[i<<1+2] rightIndex := indexArray[i<<1+3] if leftIndex >= 0 && rightIndex >= leftIndex { log.Contents = append(log.Contents, &protocol.Log_Content{Key: p.Keys[i], Value: (*val)[leftIndex:rightIndex]}) } } return true } func init() { pipeline.Processors["processor_regex"] = func() pipeline.Processor { return &ProcessorRegex{ FullMatch: false, NoMatchError: true, KeepSourceIfParseError: true, } } }