plugins/processor/stringreplace/processor_string_replace.go (105 lines of code) (raw):

// Copyright 2023 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package stringreplace import ( "errors" "strconv" "strings" "github.com/dlclark/regexp2" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/selfmonitor" ) type ProcessorStringReplace struct { SourceKey string Method string Match string ReplaceString string DestKey string re *regexp2.Regexp context pipeline.Context logPairMetric selfmonitor.CounterMetric } const ( PluginName = "processor_string_replace" MethodRegex = "regex" MethodConst = "const" MethodUnquote = "unquote" ) var errNoMethod = errors.New("no method error") var errNoMatch = errors.New("no match error") var errNoSourceKey = errors.New("no source key error") // Init called for init some system resources, like socket, mutex... func (p *ProcessorStringReplace) Init(context pipeline.Context) error { p.context = context if len(p.SourceKey) == 0 { return errNoSourceKey } var err error switch p.Method { case MethodConst: if len(p.Match) == 0 { return errNoMatch } case MethodRegex: p.re, err = regexp2.Compile(p.Match, regexp2.RE2) if err != nil { logger.Error(p.context.GetRuntimeContext(), "PROCESSOR_INIT_ALARM", "init regex error", err, "regex", p.Match) return err } case MethodUnquote: default: return errNoMethod } metricsRecord := p.context.GetMetricRecord() p.logPairMetric = selfmonitor.NewAverageMetricAndRegister(metricsRecord, selfmonitor.PluginPairsPerLogTotal) return nil } func (*ProcessorStringReplace) Description() string { return "regex replace processor for logtail" } func (p *ProcessorStringReplace) ProcessLogs(logArray []*protocol.Log) []*protocol.Log { replaceCount := 0 for _, log := range logArray { for _, cont := range log.Contents { if p.SourceKey != cont.Key { continue } var newContVal string var err error switch p.Method { case MethodConst: newContVal = strings.ReplaceAll(cont.Value, p.Match, p.ReplaceString) case MethodRegex: if ok, _ := p.re.MatchString(cont.Value); ok { newContVal, err = p.re.Replace(cont.Value, p.ReplaceString, -1, -1) } else { newContVal = cont.Value } case MethodUnquote: if strings.HasPrefix(cont.Value, "\"") && strings.HasSuffix(cont.Value, "\"") { newContVal, err = strconv.Unquote(cont.Value) } else { newContVal, err = strconv.Unquote("\"" + strings.ReplaceAll(cont.Value, "\"", "\\x22") + "\"") } default: newContVal = cont.Value } if err != nil { logger.Error(p.context.GetRuntimeContext(), "PROCESSOR_INIT_ALARM", "process log error", err) newContVal = cont.Value } if len(p.DestKey) > 0 { log.Contents = append(log.Contents, &protocol.Log_Content{Key: p.DestKey, Value: newContVal}) } else { cont.Value = newContVal } replaceCount++ } } p.logPairMetric.Add(int64(replaceCount)) return logArray } func init() { pipeline.Processors[PluginName] = func() pipeline.Processor { return &ProcessorStringReplace{} } }