processor/ratelimitprocessor/local.go (49 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor"
import (
"context"
"sync"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/processor"
"golang.org/x/time/rate"
)
var _ RateLimiter = (*localRateLimiter)(nil)
type localRateLimiter struct {
cfg *Config
set processor.Settings
// TODO use an LRU to keep a cap on the number of limiters.
// When the LRU capacity is exceeded, reuse the evicted limiter.
limiters sync.Map
}
func newLocalRateLimiter(cfg *Config, set processor.Settings) (*localRateLimiter, error) {
return &localRateLimiter{cfg: cfg, set: set}, nil
}
func (r *localRateLimiter) Start(ctx context.Context, host component.Host) error {
return nil
}
func (r *localRateLimiter) Shutdown(ctx context.Context) error {
r.limiters = sync.Map{}
return nil
}
func (r *localRateLimiter) RateLimit(ctx context.Context, hits int) error {
// Each (shared) processor gets its own rate limiter,
// so it's enough to use client metadata-based unique key.
key := getUniqueKey(ctx, r.cfg.MetadataKeys)
v, _ := r.limiters.LoadOrStore(key, rate.NewLimiter(rate.Limit(r.cfg.Rate), r.cfg.Burst))
limiter := v.(*rate.Limiter)
switch r.cfg.ThrottleBehavior {
case ThrottleBehaviorError:
if ok := limiter.AllowN(time.Now(), hits); !ok {
return errTooManyRequests
}
case ThrottleBehaviorDelay:
r := limiter.ReserveN(time.Now(), hits)
if !r.OK() {
return errTooManyRequests
}
timer := time.NewTimer(r.Delay())
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}
}
return nil
}