processor/integrationprocessor/processor.go (171 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package integrationprocessor // import "github.com/elastic/opentelemetry-collector-components/processor/integrationprocessor" import ( "context" "fmt" "slices" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "github.com/elastic/opentelemetry-collector-components/pkg/integrations" ) type integrationProcessor struct { params processor.Settings config *Config components []component.Component capabilities consumer.Capabilities nextMetricsConsumer consumer.Metrics nextLogsConsumer consumer.Logs nextTracesConsumer consumer.Traces metrics consumer.Metrics logs consumer.Logs traces consumer.Traces } func newTemplateLogsProcessor(params processor.Settings, config *Config, consumer consumer.Logs) *integrationProcessor { return &integrationProcessor{ params: params, config: config, nextLogsConsumer: consumer, } } func newTemplateMetricsProcessor(params processor.Settings, config *Config, consumer consumer.Metrics) *integrationProcessor { return &integrationProcessor{ params: params, config: config, nextMetricsConsumer: consumer, } } func newTemplateTracesProcessor(params processor.Settings, config *Config, consumer consumer.Traces) *integrationProcessor { return &integrationProcessor{ params: params, config: config, nextTracesConsumer: consumer, } } // factoryGetter is an interface that the component.Host passed to processorcreator's Start function must implement // GetFactory is optional in hosts since 107.0, but we require it. type factoryGetter interface { component.Host GetFactory(component.Kind, component.Type) component.Factory } // Start creates and starts a processor, composed by a chain of other processors. func (r *integrationProcessor) Start(ctx context.Context, ch component.Host) error { host, ok := ch.(factoryGetter) if !ok { return fmt.Errorf("integrationprocessor is not compatible with the provided component.Host") } integration, err := integrations.Find(ctx, r.params.Logger, host, r.config.Name) if err != nil { return fmt.Errorf("failed to find integration %q: %w", r.config.Name, err) } config, err := integration.Resolve(ctx, r.config.Parameters, []pipeline.ID{r.config.Pipeline}) if err != nil { return fmt.Errorf("failed to build pipeline for integration %q: %w", r.config.Name, err) } // Confidence checks, this config should only have the requested pipeline. if n := len(config.Pipelines); n != 1 { return fmt.Errorf("exactly one pipeline expected, found %d", n) } pipeline, found := config.Pipelines[r.config.Pipeline] if !found { return fmt.Errorf("expected pipeline %q not found", r.config.Pipeline) } err = r.startPipeline(ctx, host, *config, r.config.Pipeline, pipeline) if err != nil { // Shutdown components that had been already started for cleanup. if err := r.Shutdown(ctx); err != nil { r.params.Logger.Warn("Failed to shutdown all components on error while starting", zap.String("error", err.Error())) } return fmt.Errorf("failed to start pipeline %q: %w", r.config.Pipeline, err) } return nil } func (r *integrationProcessor) startPipeline(ctx context.Context, host factoryGetter, config integrations.Config, pipelineID pipeline.ID, pipeline integrations.PipelineConfig) error { r.logs = r.nextLogsConsumer r.metrics = r.nextMetricsConsumer r.traces = r.nextTracesConsumer processors := slices.Clone(pipeline.Processors) slices.Reverse(processors) for i, id := range processors { processorConfig, found := config.Processors[id] if !found { return fmt.Errorf("processor %q not found", id) } factory, ok := host.GetFactory(component.KindProcessor, id.Type()).(processor.Factory) if !ok { return fmt.Errorf("could not find processor factory for %q", id.Type()) } config, err := convertComponentConfig(factory.CreateDefaultConfig, processorConfig) if err != nil { return fmt.Errorf("could not compose processor config for %s: %w", id.String(), err) } params := processor.Settings(r.params) params.ID = component.NewIDWithName(factory.Type(), fmt.Sprintf("%s-%s-%d", r.params.ID, pipelineID, i)) params.Logger = params.Logger.With(zap.String("name", params.ID.String())) if r.logs != nil { logs, err := factory.CreateLogs(ctx, params, config, r.logs) if err != nil { return fmt.Errorf("failed to create logs processor %s: %w", params.ID, err) } r.logs = logs r.components = append(r.components, logs) } if r.metrics != nil { metrics, err := factory.CreateMetrics(ctx, params, config, r.metrics) if err != nil { return fmt.Errorf("failed to create metrics processor %s: %w", params.ID, err) } r.metrics = metrics r.components = append(r.components, metrics) } if r.traces != nil { traces, err := factory.CreateTraces(ctx, params, config, r.traces) if err != nil { return fmt.Errorf("failed to create traces processor %s: %w", params.ID, err) } r.traces = traces r.components = append(r.components, traces) } } for _, component := range r.components { err := component.Start(ctx, host) if err != nil { return fmt.Errorf("failed to start component %q: %w", component, err) } } return nil } func (r *integrationProcessor) Shutdown(ctx context.Context) error { // Shutdown them in reverse order as they were created. components := slices.Clone(r.components) slices.Reverse(r.components) for _, c := range components { err := c.Shutdown(ctx) if err != nil { return fmt.Errorf("failed to shutdown component %s: %w", c, err) } } return nil } func (r *integrationProcessor) Capabilities() consumer.Capabilities { return r.capabilities } func (r *integrationProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error { return r.logs.ConsumeLogs(ctx, logs) } func (r *integrationProcessor) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { return r.metrics.ConsumeMetrics(ctx, metrics) } func (r *integrationProcessor) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { return r.traces.ConsumeTraces(ctx, traces) } // convertComponentConfig merges the raw configuration received from the integration // with the configuration object returned by `create`. `create` is expected to be the // `CreateDefaultConfig` of a component factory. func convertComponentConfig(create func() component.Config, config component.Config) (component.Config, error) { received := confmap.New() err := received.Marshal(config) if err != nil { return nil, err } prepared := create() err = received.Unmarshal(prepared) return prepared, err }