processor/ratelimitprocessor/processor.go (203 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor"
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent"
)
type rateLimiterProcessor struct {
component.Component
rl RateLimiter
}
type LogsRateLimiterProcessor struct {
rateLimiterProcessor
count func(logs plog.Logs) int
next func(ctx context.Context, logs plog.Logs) error
}
type MetricsRateLimiterProcessor struct {
rateLimiterProcessor
count func(metrics pmetric.Metrics) int
next func(ctx context.Context, metrics pmetric.Metrics) error
}
type TracesRateLimiterProcessor struct {
rateLimiterProcessor
count func(traces ptrace.Traces) int
next func(ctx context.Context, traces ptrace.Traces) error
}
type ProfilesRateLimiterProcessor struct {
rateLimiterProcessor
count func(profiles pprofile.Profiles) int
next func(ctx context.Context, profiles pprofile.Profiles) error
}
func NewLogsRateLimiterProcessor(
rateLimiter *sharedcomponent.Component[rateLimiterComponent],
strategy Strategy,
next func(ctx context.Context, logs plog.Logs) error,
) *LogsRateLimiterProcessor {
return &LogsRateLimiterProcessor{
rateLimiterProcessor: rateLimiterProcessor{
Component: rateLimiter,
rl: rateLimiter.Unwrap(),
},
count: getLogsCountFunc(strategy),
next: next,
}
}
func NewMetricsRateLimiterProcessor(
rateLimiter *sharedcomponent.Component[rateLimiterComponent],
strategy Strategy,
next func(ctx context.Context, metrics pmetric.Metrics) error,
) *MetricsRateLimiterProcessor {
return &MetricsRateLimiterProcessor{
rateLimiterProcessor: rateLimiterProcessor{
Component: rateLimiter,
rl: rateLimiter.Unwrap(),
},
count: getMetricsCountFunc(strategy),
next: next,
}
}
func NewTracesRateLimiterProcessor(
rateLimiter *sharedcomponent.Component[rateLimiterComponent],
strategy Strategy,
next func(ctx context.Context, traces ptrace.Traces) error,
) *TracesRateLimiterProcessor {
return &TracesRateLimiterProcessor{
rateLimiterProcessor: rateLimiterProcessor{
Component: rateLimiter,
rl: rateLimiter.Unwrap(),
},
count: getTracesCountFunc(strategy),
next: next,
}
}
func NewProfilesRateLimiterProcessor(
rateLimiter *sharedcomponent.Component[rateLimiterComponent],
strategy Strategy,
next func(ctx context.Context, profiles pprofile.Profiles) error,
) *ProfilesRateLimiterProcessor {
return &ProfilesRateLimiterProcessor{
rateLimiterProcessor: rateLimiterProcessor{
Component: rateLimiter,
rl: rateLimiter.Unwrap(),
},
count: getProfilesCountFunc(strategy),
next: next,
}
}
func (r *LogsRateLimiterProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (r *MetricsRateLimiterProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (r *TracesRateLimiterProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (r *ProfilesRateLimiterProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (r *LogsRateLimiterProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
hits := r.count(ld)
if err := r.rl.RateLimit(ctx, hits); err != nil {
return err
}
return r.next(ctx, ld)
}
func (r *MetricsRateLimiterProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
hits := r.count(md)
if err := r.rl.RateLimit(ctx, hits); err != nil {
return err
}
return r.next(ctx, md)
}
func (r *TracesRateLimiterProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
hits := r.count(td)
if err := r.rl.RateLimit(ctx, hits); err != nil {
return err
}
return r.next(ctx, td)
}
func (r *ProfilesRateLimiterProcessor) ConsumeProfiles(ctx context.Context, pd pprofile.Profiles) error {
hits := r.count(pd)
if err := r.rl.RateLimit(ctx, hits); err != nil {
return err
}
return r.next(ctx, pd)
}
func getLogsCountFunc(strategy Strategy) func(ld plog.Logs) int {
switch strategy {
case StrategyRateLimitRequests:
return func(ld plog.Logs) int {
return 1
}
case StrategyRateLimitRecords:
return func(ld plog.Logs) int {
return ld.LogRecordCount()
}
case StrategyRateLimitBytes:
return func(ld plog.Logs) int {
pm := plog.ProtoMarshaler{}
return pm.LogsSize(ld)
}
}
return nil
}
func getMetricsCountFunc(strategy Strategy) func(md pmetric.Metrics) int {
switch strategy {
case StrategyRateLimitRequests:
return func(md pmetric.Metrics) int {
return 1
}
case StrategyRateLimitRecords:
return func(md pmetric.Metrics) int {
return md.DataPointCount()
}
case StrategyRateLimitBytes:
return func(md pmetric.Metrics) int {
pm := pmetric.ProtoMarshaler{}
return pm.MetricsSize(md)
}
}
// cannot happen, prevented by config.Validate()
return nil
}
func getTracesCountFunc(strategy Strategy) func(td ptrace.Traces) int {
switch strategy {
case StrategyRateLimitRequests:
return func(td ptrace.Traces) int {
return 1
}
case StrategyRateLimitRecords:
return func(td ptrace.Traces) int {
return td.SpanCount()
}
case StrategyRateLimitBytes:
return func(td ptrace.Traces) int {
pm := ptrace.ProtoMarshaler{}
return pm.TracesSize(td)
}
}
// cannot happen, prevented by config.Validate()
return nil
}
func getProfilesCountFunc(strategy Strategy) func(pd pprofile.Profiles) int {
switch strategy {
case StrategyRateLimitRequests:
return func(pd pprofile.Profiles) int {
return 1
}
case StrategyRateLimitRecords:
return func(pd pprofile.Profiles) int {
return pd.SampleCount()
}
case StrategyRateLimitBytes:
return func(pd pprofile.Profiles) int {
pm := pprofile.ProtoMarshaler{}
return pm.ProfilesSize(pd)
}
}
// cannot happen, prevented by config.Validate()
return nil
}