processor/ratelimitprocessor/config.go (104 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor"
import (
"errors"
"fmt"
"sort"
"strings"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/confmap"
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/gubernator"
)
const (
msgEmptyField = "%s field value is empty"
)
// Config holds configuration for the ratelimit processor.
type Config struct {
// Gubernator holds configuration for Gubernator,
// to control distributed rate limiting.
//
// If Gubernator is nil, then rate limiting is performed
// locally to the collector.
Gubernator *GubernatorConfig `mapstructure:"gubernator"`
// MetadataKeys holds a list of client metadata keys for
// defining the rate limiting key, in addition to the
// processor ID.
MetadataKeys []string `mapstructure:"metadata_keys"`
// Strategy holds the rate limiting strategy.
//
// Defaults to "requests".
Strategy Strategy `mapstructure:"strategy"`
// Rate holds bucket refill rate, in tokens per second.
Rate int `mapstructure:"rate"`
// Burst holds the maximum capacity of rate limit buckets.
Burst int `mapstructure:"burst"`
// ThrottleBehavior holds the behavior when rate limit is exceeded.
//
// Defaults to "error"
ThrottleBehavior ThrottleBehavior `mapstructure:"throttle_behavior"`
}
// Strategy identifies the rate-limiting strategy: requests, records, or bytes.
type Strategy string
const (
// StrategyRateLimitRequests identifies the strategy for
// rate limiting by request.
StrategyRateLimitRequests Strategy = "requests"
// StrategyRateLimitRecords identifies the strategy for
// rate limiting by record: log record, span, metric
// data point, or profile sample.
StrategyRateLimitRecords Strategy = "records"
// StrategyRateLimitBytes identifies the strategy for
// rate limiting by number of bytes.
//
// NOTE measuring the size of data in bytes is much more
// expensive compared to counting the number of requests
// and records. Bear in mind that this strategy may impact
// CPU and memory usage.
StrategyRateLimitBytes Strategy = "bytes"
)
// ThrottleBehavior identifies the behavior when rate limit is exceeded.
type ThrottleBehavior string
const (
// ThrottleBehaviorError is the behavior to return an error immediately on throttle and does not send the event.
ThrottleBehaviorError ThrottleBehavior = "error"
// ThrottleBehaviorDelay is the behavior to delay the sending until it is no longer throttled.
ThrottleBehaviorDelay ThrottleBehavior = "delay"
)
// GubernatorConfig holds Gubernator-specific configuration for the ratelimit processor.
type GubernatorConfig struct {
configgrpc.ClientConfig `mapstructure:",squash"`
// Behavior holds a list of Gubernator behaviors. If this is unspecified,
// Gubernator's default batching behavior is used.
Behavior []GubernatorBehavior `mapstructure:"behavior"`
}
// GubernatorBehavior controls Gubernator's behavior.
type GubernatorBehavior string
func createDefaultConfig() component.Config {
return &Config{
Strategy: StrategyRateLimitRequests,
ThrottleBehavior: ThrottleBehaviorError,
}
}
func (config *Config) Validate() error {
var errs []error
if config.Rate <= 0 {
errs = append(errs, fmt.Errorf("rate must be greater than zero"))
}
if config.Burst <= 0 {
errs = append(errs, fmt.Errorf("burst must be greater than zero"))
}
return errors.Join(errs...)
}
// Validate checks if strategy matches the possible options for the rate limiter's strategy
func (s Strategy) Validate() error {
switch s {
case StrategyRateLimitRequests, StrategyRateLimitRecords, StrategyRateLimitBytes:
return nil
}
return fmt.Errorf(
"invalid strategy %q, expected one of %q",
s, []string{
string(StrategyRateLimitRequests),
string(StrategyRateLimitRecords),
string(StrategyRateLimitBytes),
},
)
}
// Validate checks if throttle behavior matches the possible options
func (s ThrottleBehavior) Validate() error {
switch s {
case ThrottleBehaviorError, ThrottleBehaviorDelay:
return nil
}
return fmt.Errorf(
"invalid throttle behavior %q, expected one of %q",
s, []string{
string(ThrottleBehaviorError),
string(ThrottleBehaviorDelay),
},
)
}
func (config *GubernatorConfig) Unmarshal(parser *confmap.Conf) error {
clientConfig := configgrpc.NewDefaultClientConfig()
config.ClientConfig = *clientConfig
return parser.Unmarshal(config)
}
func (config *GubernatorConfig) Validate() error {
var errs []error
if config.Endpoint == "" {
errs = append(errs, fmt.Errorf(msgEmptyField, "gubernator.endpoint"))
}
return errors.Join(errs...)
}
func (b GubernatorBehavior) Validate() error {
if _, ok := gubernator.Behavior_value[strings.ToUpper(string(b))]; ok {
return nil
}
validNames := make([]string, 0, len(gubernator.Behavior_name))
for _, name := range gubernator.Behavior_name {
validNames = append(validNames, strings.ToLower(name))
}
sort.Strings(validNames)
return fmt.Errorf("invalid Gubernator behavior %q, expected one of %q", b, validNames)
}