in processor/ratelimitprocessor/gubernator.go [87:142]
func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
uniqueKey := getUniqueKey(ctx, r.cfg.MetadataKeys)
createdAt := time.Now().UnixMilli()
getRateLimitsResp, err := r.client.GetRateLimits(ctx, &gubernator.GetRateLimitsReq{
Requests: []*gubernator.RateLimitReq{{
Name: r.set.ID.String(),
UniqueKey: uniqueKey,
Hits: int64(hits),
Behavior: r.behavior,
Algorithm: gubernator.Algorithm_LEAKY_BUCKET,
Limit: int64(r.cfg.Rate), // rate is per second
Burst: int64(r.cfg.Burst),
Duration: 1000, // duration is in milliseconds, i.e. 1s
CreatedAt: &createdAt,
}},
})
if err != nil {
r.set.Logger.Error("error executing gubernator rate limit request", zap.Error(err))
return errRateLimitInternalError
}
// Inside the gRPC response, we should have a single-item list of responses.
responses := getRateLimitsResp.GetResponses()
if n := len(responses); n != 1 {
return fmt.Errorf("expected 1 response from gubernator, got %d", n)
}
resp := responses[0]
if resp.GetError() != "" {
r.set.Logger.Error("failed to get response from gubernator", zap.Error(errors.New(resp.GetError())))
return errRateLimitInternalError
}
if resp.GetStatus() != gubernator.Status_UNDER_LIMIT {
// Same logic as local
switch r.cfg.ThrottleBehavior {
case ThrottleBehaviorError:
r.set.Logger.Error(
"request is over the limits defined by the rate limiter",
zap.Error(errTooManyRequests),
zap.String("processor_id", r.set.ID.String()),
zap.Strings("metadata_keys", r.cfg.MetadataKeys),
)
return errTooManyRequests
case ThrottleBehaviorDelay:
delay := time.Duration(resp.GetResetTime()-createdAt) * time.Millisecond
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}
}
}
return nil
}