plugins/processor/fieldswithcondition/processor_fields_with_condition.go (254 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package fieldswithcondition import ( "fmt" "regexp" "strings" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/selfmonitor" ) const ( PluginName = "processor_fields_with_condition" RelationOperatorEquals = "equals" RelationOperatorRegexp = "regexp" RelationOperatorContains = "contains" RelationOperatorStartwith = "startwith" LogicalOperatorAnd = "and" LogicalOperatorOr = "or" ActionAddFieldsType = "processor_add_fields" ActionDropType = "processor_drop" ) type ProcessorFieldsWithCondition struct { DropIfNotMatchCondition bool `comment:"Optional. When the case condition is not met, whether the log is discarded (true) or retained (false)"` Switch []Condition `comment:"The switch-case conditions"` filterMetric selfmonitor.CounterMetric context pipeline.Context } type Condition struct { Case ConditionCase `comment:"The condition that log data satisfies"` Actions []map[string]interface{} `comment:"The action that executes when the case condition is met"` actions []ConditionAction } type FieldApply func(reg *regexp.Regexp, conditionContent string, logContent string) bool type Field struct { apply FieldApply reg *regexp.Regexp conditionContent string } type ConditionCase struct { LogicalOperator string `comment:"Optional. The Logical operators between multiple conditional fields, alternate values are and/or"` RelationOperator string `comment:"Optional. The Relational operators for conditional fields, alternate values are equals/regexp/contains/startwith"` FieldConditions map[string]string `comment:"The key-value pair of field names and expressions"` fieldConditionFields map[string]Field } type ConditionAction struct { Type string `comment:"action type. alternate values are processor_add_fields/processor_drop"` IgnoreIfExist bool `comment:"Optional. Whether to ignore when the same key exists"` Fields map[string]interface{} `comment:"The appending fields"` DropKeys []interface{} `comment:"The dropping fields"` dropkeyDictionary map[string]interface{} } // Init called for init some system resources, like socket, mutex... func (p *ProcessorFieldsWithCondition) Init(context pipeline.Context) error { p.context = context for i := range p.Switch { // set default values relationOpertor := p.Switch[i].Case.RelationOperator switch relationOpertor { case RelationOperatorEquals: case RelationOperatorRegexp: case RelationOperatorContains: case RelationOperatorStartwith: default: if len(relationOpertor) > 0 { logger.Warning(p.context.GetRuntimeContext(), "CONDITION_INIT_ALARM", "init relationOpertor error, relationOpertor", relationOpertor) } relationOpertor = RelationOperatorEquals p.Switch[i].Case.RelationOperator = relationOpertor } logicalOperator := p.Switch[i].Case.LogicalOperator switch logicalOperator { case LogicalOperatorAnd: case LogicalOperatorOr: default: if len(logicalOperator) > 0 { logger.Warning(p.context.GetRuntimeContext(), "CONDITION_INIT_ALARM", "init logicalOperator error, logicalOperator", logicalOperator) } p.Switch[i].Case.LogicalOperator = LogicalOperatorAnd } if p.Switch[i].Case.FieldConditions != nil { p.Switch[i].Case.fieldConditionFields = make(map[string]Field) for key, val := range p.Switch[i].Case.FieldConditions { switch relationOpertor { case RelationOperatorRegexp: reg, err := regexp.Compile(val) if err != nil { logger.Warning(p.context.GetRuntimeContext(), "CONDITION_INIT_ALARM", "init condition regex error, key", key, "regex", val, "error", err) return err } p.Switch[i].Case.fieldConditionFields[key] = Field{ reg: reg, conditionContent: val, apply: func(reg *regexp.Regexp, conditionContent string, logContent string) bool { return reg.MatchString(logContent) }, } case RelationOperatorContains: p.Switch[i].Case.fieldConditionFields[key] = Field{ reg: nil, conditionContent: val, apply: func(reg *regexp.Regexp, conditionContent string, logContent string) bool { return strings.Contains(logContent, conditionContent) }, } case RelationOperatorStartwith: p.Switch[i].Case.fieldConditionFields[key] = Field{ reg: nil, conditionContent: val, apply: func(reg *regexp.Regexp, conditionContent string, logContent string) bool { return strings.HasPrefix(logContent, conditionContent) }, } default: p.Switch[i].Case.fieldConditionFields[key] = Field{ reg: nil, conditionContent: val, apply: func(reg *regexp.Regexp, conditionContent string, logContent string) bool { return logContent == conditionContent }, } } } } if p.Switch[i].Actions != nil { p.Switch[i].actions = make([]ConditionAction, len(p.Switch[i].Actions)) for j := range p.Switch[i].Actions { action := p.Switch[i].Actions[j] if typeName, ok := action["type"]; ok { switch typeName { case ActionAddFieldsType: IgnoreIfExist := false if ignoreIfExist, ok := action["IgnoreIfExist"]; ok { if dropKeysArrInner, ok := ignoreIfExist.(bool); ok { IgnoreIfExist = dropKeysArrInner } } AddFields := make(map[string]interface{}) if addFields, ok := action["Fields"]; ok { if addFieldsInner, ok := addFields.(map[string]interface{}); ok { AddFields = addFieldsInner } } p.Switch[i].actions[j] = ConditionAction{ Type: ActionAddFieldsType, IgnoreIfExist: IgnoreIfExist, Fields: AddFields, } case ActionDropType: if dropKeys, ok := action["DropKeys"]; ok { if dropKeysArr, ok := dropKeys.([]interface{}); ok { dropkeyDictionary := make(map[string]interface{}) for _, dropKey := range dropKeysArr { dropkeyDictionary[dropKey.(string)] = true } p.Switch[i].actions[j] = ConditionAction{ Type: ActionDropType, DropKeys: dropKeysArr, dropkeyDictionary: dropkeyDictionary, } } } default: logger.Error(p.context.GetRuntimeContext(), "CONDITION_INIT_ALARM", "init condition action type error, type", typeName) return fmt.Errorf("init condition action type error,type is %v", typeName) } } } } } metricsRecord := p.context.GetMetricRecord() p.filterMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginDiscardedEventsTotal) return nil } func (*ProcessorFieldsWithCondition) Description() string { return "Processor to match multiple conditions, and if one of the conditions is met, the corresponding action is performed." } // Match single case func (p *ProcessorFieldsWithCondition) isCaseMatch(log *protocol.Log, conditionCase ConditionCase) bool { if conditionCase.fieldConditionFields != nil { matchedCount := 0 for _, cont := range log.Contents { if field, ok := conditionCase.fieldConditionFields[cont.Key]; ok { if field.apply(field.reg, field.conditionContent, cont.Value) { matchedCount++ } } } if conditionCase.LogicalOperator == LogicalOperatorAnd && matchedCount == len(conditionCase.fieldConditionFields) { return true } if conditionCase.LogicalOperator == LogicalOperatorOr && matchedCount > 0 { return true } } return false } // Process single action func (p *ProcessorFieldsWithCondition) processAction(log *protocol.Log, conditionAction ConditionAction) { if conditionAction.Type == ActionAddFieldsType { dict := make(map[string]bool) for idx := range log.Contents { dict[log.Contents[idx].Key] = true } for k, v := range conditionAction.Fields { if _, exists := dict[k]; conditionAction.IgnoreIfExist && exists { continue } newContent := &protocol.Log_Content{ Key: k, Value: v.(string), } log.Contents = append(log.Contents, newContent) } } else if conditionAction.Type == ActionDropType { for idx := len(log.Contents) - 1; idx >= 0; idx-- { if _, exists := conditionAction.dropkeyDictionary[log.Contents[idx].Key]; exists { log.Contents = append(log.Contents[:idx], log.Contents[idx+1:]...) } } } } // Match different cases func (p *ProcessorFieldsWithCondition) MatchAndProcess(log *protocol.Log) bool { if p.Switch != nil { for _, condition := range p.Switch { if p.isCaseMatch(log, condition.Case) { if condition.actions != nil { for i := range condition.actions { action := condition.actions[i] p.processAction(log, action) } } return true } } } return false } func (p *ProcessorFieldsWithCondition) ProcessLogs(logArray []*protocol.Log) []*protocol.Log { totalLen := len(logArray) nextIdx := 0 for idx := 0; idx < totalLen; idx++ { if p.MatchAndProcess(logArray[idx]) || !p.DropIfNotMatchCondition { if idx != nextIdx { logArray[nextIdx] = logArray[idx] } nextIdx++ } else { p.filterMetric.Add(1) } } logArray = logArray[:nextIdx] return logArray } func init() { pipeline.Processors[PluginName] = func() pipeline.Processor { return &ProcessorFieldsWithCondition{} } }