plugins/processor/ratelimit/processor_rate_limit.go (75 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package ratelimit import ( "fmt" "sort" "strings" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/selfmonitor" ) type ProcessorRateLimit struct { Fields []string `comment:"Optional. Fields of value to be limited, for each unique result from combining these field values."` Limit string `comment:"Optional. Limit rate in the format of (number)/(time unit). Supported time unit: 's' (per second), 'm' (per minute), and 'h' (per hour)."` Algorithm algorithm limitMetric selfmonitor.CounterMetric context pipeline.Context } const pluginType = "processor_rate_limit" func (p *ProcessorRateLimit) Init(context pipeline.Context) error { p.context = context limit := rate{} err := limit.Unpack(p.Limit) if err != nil { return err } p.Algorithm = newTokenBucket(limit) metricsRecord := p.context.GetMetricRecord() p.limitMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginDiscardedEventsTotal) return nil } func (*ProcessorRateLimit) Description() string { return "rate limit processor for logtail" } // V1 func (p *ProcessorRateLimit) ProcessLogs(logArray []*protocol.Log) []*protocol.Log { totalLen := len(logArray) nextIdx := 0 for idx := 0; idx < totalLen; idx++ { key := p.makeKey(logArray[idx]) if p.Algorithm.IsAllowed(key) { if idx != nextIdx { logArray[nextIdx] = logArray[idx] } nextIdx++ } else { p.limitMetric.Add(1) } } logArray = logArray[:nextIdx] return logArray } func (p *ProcessorRateLimit) makeKey(log *protocol.Log) string { if len(p.Fields) == 0 { return "" } sort.Strings(p.Fields) values := make([]string, len(p.Fields)) for _, field := range p.Fields { exist := false for _, logContent := range log.Contents { if field == logContent.GetKey() { values = append(values, fmt.Sprintf("%v", logContent.GetValue())) exist = true break } } if !exist { values = append(values, "") } } return strings.Join(values, "_") } func init() { pipeline.Processors[pluginType] = func() pipeline.Processor { return &ProcessorRateLimit{} } }