in processor/ratelimitprocessor/local.go [53:78]
func (r *localRateLimiter) RateLimit(ctx context.Context, hits int) error {
// Each (shared) processor gets its own rate limiter,
// so it's enough to use client metadata-based unique key.
key := getUniqueKey(ctx, r.cfg.MetadataKeys)
v, _ := r.limiters.LoadOrStore(key, rate.NewLimiter(rate.Limit(r.cfg.Rate), r.cfg.Burst))
limiter := v.(*rate.Limiter)
switch r.cfg.ThrottleBehavior {
case ThrottleBehaviorError:
if ok := limiter.AllowN(time.Now(), hits); !ok {
return errTooManyRequests
}
case ThrottleBehaviorDelay:
r := limiter.ReserveN(time.Now(), hits)
if !r.OK() {
return errTooManyRequests
}
timer := time.NewTimer(r.Delay())
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}
}
return nil
}