pkg/integrations/integrations.go (79 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package integrations // import "github.com/elastic/opentelemetry-collector-components/pkg/integrations" import ( "context" "errors" "fmt" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pipeline" ) // ErrNotFound is the error returned when an integration cannot be found. var ( ErrNotFound = errors.New("not found") ErrNotConfigured = fmt.Errorf("%w: no integration finder extension found", ErrNotFound) ) // Integration is the interface for integrations that can be resolved as configuration. type Integration interface { // Resolve resolves the parametrizable integration. It uses the params to replace placeholders // in the integration, and removes or ignores everything not referenced by the indicated // pipelines. Resolve(ctx context.Context, params map[string]any, pipelines []pipeline.ID) (*Config, error) } // Finder is the interface for extensions that can be used to find integrations by their names. type Finder interface { FindIntegration(ctx context.Context, name string) (Integration, error) } // ExtensionGetter is the interface a host must implement to be used to locate finders // along extensions. // It must be a subset of the component.Host interface. type ExtensionGetter interface { // GetExtensions returns the map of extensions. Only enabled and created extensions will be returned. // GetExtensions can be called by the component anytime after Component.Start() begins and // until Component.Shutdown() ends. GetExtensions() map[component.ID]component.Component } // Compile-time check to ensure that ExtensionGetter is a subset of the component.Host interface. var _ ExtensionGetter = (component.Host)(nil) // Find looks for integrations in extensions of the host that implement the IntegrationFinder interface. // If multiple integrations match, only the first one is returned. // It returns ErrNotFound if the integration cannot be found, or ErrNotConfigured if there are no // configured finder extensions. ErrNotConfigured also wraps ErrNotFound. func Find(ctx context.Context, logger *zap.Logger, host ExtensionGetter, integrationName string) (Integration, error) { if host == nil { logger.Error("received nil host") return nil, ErrNotConfigured } anyExtension := false for eid, extension := range host.GetExtensions() { finder, ok := extension.(Finder) if !ok { continue } anyExtension = true integration, err := finder.FindIntegration(ctx, integrationName) if errors.Is(ErrNotFound, err) { continue } if err != nil { logger.Error("integration finder failed", zap.String("component", eid.String()), zap.Error(err)) return nil, err } return integration, nil } if !anyExtension { return nil, ErrNotConfigured } return nil, ErrNotFound } // Config contains an structured integration. At this point we only handle component IDs, and not // their specific configurations. type Config struct { Receivers map[component.ID]component.Config `mapstructure:"receivers"` Processors map[component.ID]component.Config `mapstructure:"processors"` Pipelines map[pipeline.ID]PipelineConfig `mapstructure:"pipelines"` } // validate validates that the integration configuration is valid and all the components referenced in the // pipelines are defined. func (c *Config) validate() error { if len(c.Pipelines) == 0 { return errors.New("missing pipelines") } for _, pipeline := range c.Pipelines { if pipeline.Receiver != nil { if _, found := c.Receivers[*pipeline.Receiver]; !found { return fmt.Errorf("receiver %s not defined", pipeline.Receiver.String()) } } for _, processor := range pipeline.Processors { if _, found := c.Processors[processor]; !found { return fmt.Errorf("processor %q not defined", processor.String()) } } } return nil } // PipelineConfig contains the definition of a pipeline in the integration. type PipelineConfig struct { // Receiver is the receiver to be used in the pipeline. It is optional, if a pipeline does not define a // receiver it cannot be used to instantiate receivers. Receiver *component.ID `mapstructure:"receiver"` // Processors is the chain of processors of the pipeline, to be used as part of the receiver in receiver // components, or as a combined processor when the pipeline is used as processor. Processors []component.ID `mapstructure:"processors"` }