pkg/integrations/rawtemplate.go (163 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package integrations // import "github.com/elastic/opentelemetry-collector-components/pkg/integrations" import ( "bytes" "context" "errors" "fmt" "slices" "strings" "gopkg.in/yaml.v3" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/pipeline" ) var _ Integration = &RawTemplate{} // RawTemplate implements the Template interface for raw YAML content. // Unused components are removed after resolving it, so the variables they contain are not required. type RawTemplate struct { source rawYAMLConfig } // NewRawTemplate creates a new RawTemplate from raw YAML content. func NewRawTemplate(raw []byte) (*RawTemplate, error) { var source rawYAMLConfig decoder := yaml.NewDecoder(bytes.NewReader(raw)) decoder.KnownFields(true) err := decoder.Decode(&source) if err != nil { return nil, fmt.Errorf("invalid integration template format: %w", err) } if err := source.validate(); err != nil { return nil, fmt.Errorf("integration template validation failed: %w", err) } return &RawTemplate{source: source}, nil } // Resolve resolves the template using a confmap resolver. func (t *RawTemplate) Resolve(ctx context.Context, params map[string]any, pipelines []pipeline.ID) (*Config, error) { selectedPipelines := t.source.Pipelines if len(pipelines) > 0 { var err error selectedPipelines, err = selectComponents(t.source.Pipelines, pipelines) if err != nil { return nil, fmt.Errorf("selecting pipelines: %v", err) } } selectedReceivers, err := selectComponents(t.source.Receivers, listReceivers(selectedPipelines)) if err != nil { return nil, fmt.Errorf("selecting receivers: %v", err) } selectedProcessors, err := selectComponents(t.source.Processors, listProcessors(selectedPipelines)) if err != nil { return nil, fmt.Errorf("selecting processors: %v", err) } rawConfig := rawYAMLConfig{ Pipelines: selectedPipelines, Receivers: selectedReceivers, Processors: selectedProcessors, } resolver, err := newResolver(rawConfig, params) if err != nil { return nil, err } conf, err := resolver.Resolve(ctx) if err != nil { return nil, err } var config Config if err := conf.Unmarshal(&config); err != nil { return nil, fmt.Errorf("failed to generate effective configuration: %w", err) } return &config, config.validate() } func newResolver(rawConfig rawYAMLConfig, variables map[string]any) (*confmap.Resolver, error) { settings := confmap.ResolverSettings{ URIs: []string{"config:main"}, ProviderFactories: []confmap.ProviderFactory{ newConfmapProviderFactory(rawConfig), newVariablesProviderFactory(variables), }, } return confmap.NewResolver(settings) } type selectableID interface { comparable fmt.Stringer } func selectComponents[C any, ID selectableID](from map[ID]C, selection []ID) (map[ID]C, error) { selected := make(map[ID]C, len(selection)) for _, id := range selection { component, found := from[id] if !found { return nil, fmt.Errorf("component %q not found", id) } selected[id] = component } return selected, nil } func listComponents(pipelines map[pipeline.ID]PipelineConfig, listIDs func(PipelineConfig) []component.ID) []component.ID { var list []component.ID for _, pipeline := range pipelines { list = append(list, listIDs(pipeline)...) } slices.SortFunc(list, func(a, b component.ID) int { return strings.Compare(a.String(), b.String()) }) return slices.Compact(list) } // listReceivers lists the IDs of all the receivers found in pipelines func listReceivers(pipelines map[pipeline.ID]PipelineConfig) []component.ID { return listComponents(pipelines, func(c PipelineConfig) []component.ID { if c.Receiver == nil { return []component.ID{} } return []component.ID{*c.Receiver} }) } // listProcessors lists the IDs of all the processors found in pipelines func listProcessors(pipelines map[pipeline.ID]PipelineConfig) []component.ID { return listComponents(pipelines, func(c PipelineConfig) []component.ID { return c.Processors }) } type rawYAMLConfig struct { Receivers map[component.ID]yaml.Node `mapstructure:"receivers" yaml:"receivers,omitempty"` Processors map[component.ID]yaml.Node `mapstructure:"processors" yaml:"processors,omitempty"` Pipelines map[pipeline.ID]PipelineConfig `mapstructure:"pipelines" yaml:"pipelines,omitempty"` } func (c *rawYAMLConfig) validate() error { if len(c.Pipelines) == 0 { return errors.New("missing pipelines") } for _, pipeline := range c.Pipelines { if pipeline.Receiver != nil { if _, found := c.Receivers[*pipeline.Receiver]; !found { return fmt.Errorf("receiver %q not defined", pipeline.Receiver.String()) } } for _, processor := range pipeline.Processors { if _, found := c.Processors[processor]; !found { return fmt.Errorf("processor %q not defined", processor.String()) } } } return nil } const configProviderScheme = "config" func newConfmapProviderFactory(main rawYAMLConfig) confmap.ProviderFactory { return confmap.NewProviderFactory(createRawConfigProvider(main)) } func createRawConfigProvider(main rawYAMLConfig) confmap.CreateProviderFunc { return func(_ confmap.ProviderSettings) confmap.Provider { return &rawConfigProvider{main: main} } } type rawConfigProvider struct { main rawYAMLConfig } func (p *rawConfigProvider) Retrieve(ctx context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { if uri != configProviderScheme+":main" { return nil, fmt.Errorf("unexpected use of %s provider, requested uri: %s", configProviderScheme, uri) } main, err := yaml.Marshal(p.main) if err != nil { return nil, fmt.Errorf("failed to encode config: %w", err) } return confmap.NewRetrievedFromYAML(main) } func (p *rawConfigProvider) Scheme() string { return configProviderScheme } func (p *rawConfigProvider) Shutdown(ctx context.Context) error { return nil }