processor/ratelimitprocessor/gubernator.go (109 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor"
import (
"context"
"errors"
"fmt"
"strings"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/gubernator"
)
var _ RateLimiter = (*gubernatorRateLimiter)(nil)
type gubernatorRateLimiter struct {
cfg *Config
set processor.Settings
behavior gubernator.Behavior
conn *grpc.ClientConn
client gubernator.V1Client
}
func newGubernatorRateLimiter(cfg *Config, set processor.Settings) (*gubernatorRateLimiter, error) {
var behavior int32
for _, b := range cfg.Gubernator.Behavior {
value, ok := gubernator.Behavior_value[strings.ToUpper(string(b))]
if !ok {
return nil, fmt.Errorf("invalid behavior %q", b)
}
behavior |= value
}
return &gubernatorRateLimiter{
cfg: cfg,
set: set,
behavior: gubernator.Behavior(behavior),
}, nil
}
func (r *gubernatorRateLimiter) Start(ctx context.Context, host component.Host) error {
if r.cfg.Gubernator.Auth != nil && r.cfg.Gubernator.Auth.AuthenticatorID.String() == "" {
// if we do not set this explicitly to nil, then it will fail when creating the connection with:
// `failed to resolve authenticator "": authenticator not found`
r.cfg.Gubernator.Auth = nil
}
conn, err := r.cfg.Gubernator.ToClientConn(ctx, host, r.set.TelemetrySettings)
if err != nil {
return fmt.Errorf("failed to connect to gubernator: %w", err)
}
r.conn = conn
r.client = gubernator.NewV1Client(r.conn)
return nil
}
func (r *gubernatorRateLimiter) Shutdown(ctx context.Context) error {
if r.conn != nil {
err := r.conn.Close()
r.conn = nil
r.client = nil
return err
}
return nil
}
func (r *gubernatorRateLimiter) RateLimit(ctx context.Context, hits int) error {
uniqueKey := getUniqueKey(ctx, r.cfg.MetadataKeys)
createdAt := time.Now().UnixMilli()
getRateLimitsResp, err := r.client.GetRateLimits(ctx, &gubernator.GetRateLimitsReq{
Requests: []*gubernator.RateLimitReq{{
Name: r.set.ID.String(),
UniqueKey: uniqueKey,
Hits: int64(hits),
Behavior: r.behavior,
Algorithm: gubernator.Algorithm_LEAKY_BUCKET,
Limit: int64(r.cfg.Rate), // rate is per second
Burst: int64(r.cfg.Burst),
Duration: 1000, // duration is in milliseconds, i.e. 1s
CreatedAt: &createdAt,
}},
})
if err != nil {
r.set.Logger.Error("error executing gubernator rate limit request", zap.Error(err))
return errRateLimitInternalError
}
// Inside the gRPC response, we should have a single-item list of responses.
responses := getRateLimitsResp.GetResponses()
if n := len(responses); n != 1 {
return fmt.Errorf("expected 1 response from gubernator, got %d", n)
}
resp := responses[0]
if resp.GetError() != "" {
r.set.Logger.Error("failed to get response from gubernator", zap.Error(errors.New(resp.GetError())))
return errRateLimitInternalError
}
if resp.GetStatus() != gubernator.Status_UNDER_LIMIT {
// Same logic as local
switch r.cfg.ThrottleBehavior {
case ThrottleBehaviorError:
r.set.Logger.Error(
"request is over the limits defined by the rate limiter",
zap.Error(errTooManyRequests),
zap.String("processor_id", r.set.ID.String()),
zap.Strings("metadata_keys", r.cfg.MetadataKeys),
)
return errTooManyRequests
case ThrottleBehaviorDelay:
delay := time.Duration(resp.GetResetTime()-createdAt) * time.Millisecond
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}
}
}
return nil
}