plugins/processor/pickkey/processor_pick_key.go (86 lines of code) (raw):
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pickkey
import (
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
"github.com/alibaba/ilogtail/pkg/selfmonitor"
)
const pluginType = "processor_pick_key"
// ProcessorPickKey is picker to select or drop specific keys in LogContents
type ProcessorPickKey struct {
Include []string
Exclude []string
includeMap map[string]struct{}
excludeMap map[string]struct{}
includeLen int
excludeLen int
filterMetric selfmonitor.CounterMetric
context pipeline.Context
}
// Init called for init some system resources, like socket, mutex...
func (p *ProcessorPickKey) Init(context pipeline.Context) error {
p.context = context
metricsRecord := p.context.GetMetricRecord()
p.filterMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginDiscardedEventsTotal)
if len(p.Include) > 0 {
p.includeMap = make(map[string]struct{})
for _, key := range p.Include {
p.includeMap[key] = struct{}{}
}
}
p.includeLen = len(p.Include)
if len(p.Exclude) > 0 {
p.excludeMap = make(map[string]struct{})
for _, key := range p.Exclude {
p.excludeMap[key] = struct{}{}
}
}
p.excludeLen = len(p.Exclude)
return nil
}
func (*ProcessorPickKey) Description() string {
return "regex filter for logtail"
}
func (p *ProcessorPickKey) process(log *protocol.Log) bool {
beginLen := len(log.Contents)
if p.includeLen > 0 {
tmpContent := log.Contents
log.Contents = make([]*protocol.Log_Content, 0, p.includeLen)
for _, cont := range tmpContent {
if _, ok := p.includeMap[cont.Key]; ok {
log.Contents = append(log.Contents, cont)
}
}
}
if p.excludeLen > 0 {
tmpContent := log.Contents
deltaLen := len(tmpContent) - p.excludeLen
if deltaLen <= 4 {
deltaLen = 4
}
log.Contents = make([]*protocol.Log_Content, 0, deltaLen)
for _, cont := range tmpContent {
if _, ok := p.excludeMap[cont.Key]; !ok {
log.Contents = append(log.Contents, cont)
}
}
}
p.filterMetric.Add(int64(beginLen - len(log.Contents)))
return len(log.Contents) != 0
}
func (p *ProcessorPickKey) ProcessLogs(logArray []*protocol.Log) []*protocol.Log {
totalLen := len(logArray)
nextIdx := 0
for idx := 0; idx < totalLen; idx++ {
if p.process(logArray[idx]) {
if nextIdx != idx {
logArray[nextIdx] = logArray[idx]
}
nextIdx++
}
}
logArray = logArray[:nextIdx]
return logArray
}
func init() {
pipeline.Processors[pluginType] = func() pipeline.Processor {
return &ProcessorPickKey{}
}
}