plugins/processor/droplastkey/processor_drop_last_key.go (64 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package droplastkey import ( "fmt" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/selfmonitor" ) // ProcessorDropLastKey is used to drop log content when process done type ProcessorDropLastKey struct { Include []string DropKey string includeMap map[string]struct{} filterMetric selfmonitor.CounterMetric context pipeline.Context } // Init called for init some system resources, like socket, mutex... func (p *ProcessorDropLastKey) Init(context pipeline.Context) error { p.context = context metricsRecord := p.context.GetMetricRecord() p.filterMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginDiscardedEventsTotal) if len(p.DropKey) == 0 { return fmt.Errorf("Invalid config, DropKey is empty") } if len(p.Include) > 0 { p.includeMap = make(map[string]struct{}) for _, key := range p.Include { p.includeMap[key] = struct{}{} } } else { return fmt.Errorf("Invalid config, Include is empty") } return nil } func (*ProcessorDropLastKey) Description() string { return "processor_drop_last_key is used to drop log content when process done" } func (p *ProcessorDropLastKey) process(log *protocol.Log) { dropFlag := false for _, content := range log.Contents { if _, ok := p.includeMap[content.Key]; ok { dropFlag = true break } } if !dropFlag { return } for index, content := range log.Contents { if content.Key == p.DropKey { log.Contents = append(log.Contents[:index], log.Contents[index+1:]...) p.filterMetric.Add(1) break } } } func (p *ProcessorDropLastKey) ProcessLogs(logArray []*protocol.Log) []*protocol.Log { for _, log := range logArray { p.process(log) } return logArray } func init() { pipeline.Processors["processor_drop_last_key"] = func() pipeline.Processor { return &ProcessorDropLastKey{} } }