processor/ratelimitprocessor/factory.go (117 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package ratelimitprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor" import ( "context" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/xprocessor" "github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent" "github.com/elastic/opentelemetry-collector-components/processor/ratelimitprocessor/internal/metadata" ) var rateLimiters = sharedcomponent.NewMap[*Config, rateLimiterComponent]() type rateLimiterComponent interface { component.Component RateLimiter } func NewFactory() xprocessor.Factory { return xprocessor.NewFactory( metadata.Type, createDefaultConfig, xprocessor.WithProfiles(createProfilesProcessor, metadata.ProfilesStability), xprocessor.WithTraces(createTracesProcessor, metadata.TracesStability), xprocessor.WithMetrics(createMetricsProcessor, metadata.MetricsStability), xprocessor.WithLogs(createLogsProcessor, metadata.LogsStability), ) } func getRateLimiter( config *Config, set processor.Settings, ) (*sharedcomponent.Component[rateLimiterComponent], error) { return rateLimiters.LoadOrStore(config, func() (rateLimiterComponent, error) { if config.Gubernator != nil { return newGubernatorRateLimiter(config, set) } return newLocalRateLimiter(config, set) }) } func createLogsProcessor( _ context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Logs, ) (processor.Logs, error) { config := cfg.(*Config) rateLimiter, err := getRateLimiter(config, set) if err != nil { return nil, err } return NewLogsRateLimiterProcessor( rateLimiter, config.Strategy, func(ctx context.Context, ld plog.Logs) error { return nextConsumer.ConsumeLogs(ctx, ld) }, ), nil } func createMetricsProcessor( _ context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Metrics, ) (processor.Metrics, error) { config := cfg.(*Config) rateLimiter, err := getRateLimiter(config, set) if err != nil { return nil, err } return NewMetricsRateLimiterProcessor( rateLimiter, config.Strategy, func(ctx context.Context, md pmetric.Metrics) error { return nextConsumer.ConsumeMetrics(ctx, md) }, ), nil } func createTracesProcessor( _ context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces, ) (processor.Traces, error) { config := cfg.(*Config) rateLimiter, err := getRateLimiter(config, set) if err != nil { return nil, err } return NewTracesRateLimiterProcessor( rateLimiter, config.Strategy, func(ctx context.Context, td ptrace.Traces) error { return nextConsumer.ConsumeTraces(ctx, td) }, ), nil } func createProfilesProcessor( _ context.Context, set processor.Settings, cfg component.Config, nextConsumer xconsumer.Profiles, ) (xprocessor.Profiles, error) { config := cfg.(*Config) rateLimiter, err := getRateLimiter(config, set) if err != nil { return nil, err } return NewProfilesRateLimiterProcessor( rateLimiter, config.Strategy, func(ctx context.Context, td pprofile.Profiles) error { return nextConsumer.ConsumeProfiles(ctx, td) }, ), nil }