receiver/integrationreceiver/receiver.go (264 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package integrationreceiver // import "github.com/elastic/opentelemetry-collector-components/receiver/integrationreceiver" import ( "context" "errors" "fmt" "slices" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pipeline" otelpipeline "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "github.com/elastic/opentelemetry-collector-components/pkg/integrations" ) type integrationReceiver struct { params receiver.Settings config *Config components []component.Component nextMetricsConsumer consumer.Metrics nextLogsConsumer consumer.Logs nextTracesConsumer consumer.Traces } func newTemplateLogsReceiver(params receiver.Settings, config *Config, consumer consumer.Logs) *integrationReceiver { return &integrationReceiver{ params: params, config: config, nextLogsConsumer: consumer, } } func newTemplateMetricsReceiver(params receiver.Settings, config *Config, consumer consumer.Metrics) *integrationReceiver { return &integrationReceiver{ params: params, config: config, nextMetricsConsumer: consumer, } } func newTemplateTracesReceiver(params receiver.Settings, config *Config, consumer consumer.Traces) *integrationReceiver { return &integrationReceiver{ params: params, config: config, nextTracesConsumer: consumer, } } // factoryGetter is an interface that the component.Host passed to receivercreator's Start function must implement // GetFactory is optional in hosts since 107.0, but we require it. type factoryGetter interface { component.Host GetFactory(component.Kind, component.Type) component.Factory } // Start creates and starts a receiver composed by other components. It is composed of at least one pipeline, each one // of them composed by one receiver, and optionally a set of processors to customize the received data. func (r *integrationReceiver) Start(ctx context.Context, ch component.Host) error { host, ok := ch.(factoryGetter) if !ok { return fmt.Errorf("integrationreceiver is not compatible with the provided component.Host") } integration, err := integrations.Find(ctx, r.params.Logger, host, r.config.Name) if err != nil { return fmt.Errorf("failed to find integration %q: %w", r.config.Name, err) } config, err := integration.Resolve(ctx, r.config.Parameters, r.config.Pipelines) if err != nil { return fmt.Errorf("failed to build receiver pipelines for integration %q: %w", r.config.Name, err) } // Confidence check, we should have as many unique pipelines as requested. if found, expected := len(config.Pipelines), len(r.config.Pipelines); expected != found { return fmt.Errorf("unexpected number of pipelines configured, found %d, expected %d", found, expected) } for id, pipeline := range config.Pipelines { if !r.hasConsumerForPipelineType(id) { continue } err := r.startPipeline(ctx, host, *config, id, pipeline) if err != nil { // Shutdown components that had been already started for cleanup. if err := r.Shutdown(ctx); err != nil { r.params.Logger.Warn("Failed to shutdown all components on error while starting", zap.String("error", err.Error())) } return fmt.Errorf("failed to start pipeline %q: %w", id, err) } } return nil } func (r *integrationReceiver) hasConsumerForPipelineType(id pipeline.ID) bool { switch id.Signal() { case pipeline.SignalLogs: return r.nextLogsConsumer != nil case pipeline.SignalMetrics: return r.nextMetricsConsumer != nil case pipeline.SignalTraces: return r.nextTracesConsumer != nil default: r.params.Logger.Warn("unexpected signal type in integration", zap.String("id", id.String())) return false } } func (r *integrationReceiver) startPipeline(ctx context.Context, host factoryGetter, config integrations.Config, pipelineID pipeline.ID, pipeline integrations.PipelineConfig) error { consumerChain := struct { logs consumer.Logs metrics consumer.Metrics traces consumer.Traces }{ logs: r.nextLogsConsumer, metrics: r.nextMetricsConsumer, traces: r.nextTracesConsumer, } if pipeline.Receiver == nil { return errors.New("no receiver in pipeline configuration") } receiverConfig, found := config.Receivers[*pipeline.Receiver] if !found { return fmt.Errorf("receiver %q not found", pipeline.Receiver) } receiverFactory, ok := host.GetFactory(component.KindReceiver, pipeline.Receiver.Type()).(receiver.Factory) if !ok { return fmt.Errorf("could not find receiver factory for %q", pipeline.Receiver.Type()) } preparedConfig, err := convertComponentConfig(receiverFactory.CreateDefaultConfig, receiverConfig) if err != nil { return fmt.Errorf("could not compose receiver config for %s: %w", pipeline.Receiver.String(), err) } var components []component.Component processors := slices.Clone(pipeline.Processors) slices.Reverse(processors) for i, id := range processors { processorConfig, found := config.Processors[id] if !found { return fmt.Errorf("processor %q not found", id) } factory, ok := host.GetFactory(component.KindProcessor, id.Type()).(processor.Factory) if !ok { return fmt.Errorf("could not find processor factory for %q", id.Type()) } config, err := convertComponentConfig(factory.CreateDefaultConfig, processorConfig) if err != nil { return fmt.Errorf("could not compose processor config for %s: %w", id.String(), err) } params := processor.Settings{ TelemetrySettings: r.params.TelemetrySettings, BuildInfo: r.params.BuildInfo, } params.ID = component.NewIDWithName(factory.Type(), fmt.Sprintf("%s-%s-%d", r.params.ID, pipelineID, i)) params.Logger = params.Logger.With(zap.String("name", params.ID.String())) if consumerChain.logs != nil { logs, err := factory.CreateLogs(ctx, params, config, consumerChain.logs) if err != nil { return fmt.Errorf("failed to create logs processor %s: %w", params.ID, err) } consumerChain.logs = logs components = append(components, logs) } if consumerChain.metrics != nil { metrics, err := factory.CreateMetrics(ctx, params, config, consumerChain.metrics) if err != nil { return fmt.Errorf("failed to create metrics processor %s: %w", params.ID, err) } consumerChain.metrics = metrics components = append(components, metrics) } if consumerChain.traces != nil { traces, err := factory.CreateTraces(ctx, params, config, consumerChain.traces) if err != nil { return fmt.Errorf("failed to create traces processor %s: %w", params.ID, err) } consumerChain.traces = traces components = append(components, traces) } } params := r.params params.ID = component.NewIDWithName(receiverFactory.Type(), fmt.Sprintf("%s-receiver", pipelineID)) params.Logger = params.Logger.With(zap.String("name", params.ID.String())) receiversCreated := 0 if consumerChain.logs != nil { logs, err := receiverFactory.CreateLogs(ctx, params, preparedConfig, consumerChain.logs) switch { case err == nil: components = append(components, logs) receiversCreated += 1 case errors.Is(err, otelpipeline.ErrSignalNotSupported): r.params.Logger.Debug("receiver does not support logs telemetry type", zap.String("integration", r.params.ID.String()), zap.String("receiver", params.ID.String())) default: return fmt.Errorf("failed to create logs receiver %s: %w", params.ID, err) } } if consumerChain.metrics != nil { metrics, err := receiverFactory.CreateMetrics(ctx, params, preparedConfig, consumerChain.metrics) switch { case err == nil: components = append(components, metrics) receiversCreated += 1 case errors.Is(err, otelpipeline.ErrSignalNotSupported): r.params.Logger.Debug("receiver does not support metrics telemetry type", zap.String("integration", r.params.ID.String()), zap.String("receiver", params.ID.String())) default: return fmt.Errorf("failed to create metrics receiver %s: %w", params.ID, err) } } if consumerChain.traces != nil { traces, err := receiverFactory.CreateTraces(ctx, params, preparedConfig, consumerChain.traces) switch { case err == nil: components = append(components, traces) receiversCreated += 1 case errors.Is(err, otelpipeline.ErrSignalNotSupported): r.params.Logger.Debug("receiver does not support traces telemetry type", zap.String("integration", r.params.ID.String()), zap.String("receiver", params.ID.String())) default: return fmt.Errorf("failed to create traces receiver %s: %w", params.ID, err) } } // If no receiver has been created the rest of the pipeline won't be used, so don't keep it. if receiversCreated == 0 { // Shutting down created components out of kindness, because they haven't been started yet. if err := shutdownComponents(ctx, components); err != nil { r.params.Logger.Error("failed to cleanup processors after receiver was not created", zap.String("integration", r.params.ID.String()), zap.String("receiver", params.ID.String())) } return nil } for _, component := range components { err := component.Start(ctx, host) if err != nil { if err := r.Shutdown(ctx); err != nil { r.params.Logger.Error("failed to cleanup processors after a component failed to start", zap.String("integration", r.params.ID.String()), zap.String("receiver", params.ID.String()), zap.Error(err), ) } return fmt.Errorf("failed to start component %q: %w", component, err) } r.components = append(r.components, component) } return nil } func (r *integrationReceiver) Shutdown(ctx context.Context) error { err := shutdownComponents(ctx, r.components) if err != nil { return err } r.components = nil return nil } func shutdownComponents(ctx context.Context, components []component.Component) error { // Shutdown them in reverse order as they were created. components = slices.Clone(components) slices.Reverse(components) for _, c := range components { err := c.Shutdown(ctx) if err != nil { return fmt.Errorf("failed to shutdown component %s: %w", c, err) } } return nil } // convertComponentConfig merges the raw configuration received from the integration // with the configuration object returned by `create`. `create` is expected to be the // `CreateDefaultConfig` of a component factory. func convertComponentConfig(create func() component.Config, config component.Config) (component.Config, error) { received := confmap.New() err := received.Marshal(config) if err != nil { return nil, err } prepared := create() err = received.Unmarshal(prepared) return prepared, err }