internal/pkg/otel/configtranslate/otelconfig.go (315 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package configtranslate
import (
"fmt"
"path/filepath"
"slices"
"strings"
koanfmaps "github.com/knadh/koanf/maps"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/pipeline"
"golang.org/x/exp/maps"
elasticsearchtranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate/outputs/elasticsearch"
"github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver"
"github.com/elastic/beats/v7/x-pack/libbeat/management"
"github.com/elastic/beats/v7/x-pack/metricbeat/mbreceiver"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
)
// This is a prefix we add to all names of Otel entities in the configuration. Its purpose is to avoid collisions with
// user-provided configuration
const OtelNamePrefix = "_agent-component/"
// BeatMonitoringConfigGetter is a function that returns the monitoring configuration for a beat receiver.
type BeatMonitoringConfigGetter func(unitID, binary string) map[string]any
type exporterConfigTranslationFunc func(*config.C) (map[string]any, error)
var (
OtelSupportedOutputTypes = []string{"elasticsearch"}
OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics"}
configTranslationFuncForExporter = map[otelcomponent.Type]exporterConfigTranslationFunc{
otelcomponent.MustNewType("elasticsearch"): translateEsOutputToExporter,
}
)
// GetOtelConfig returns the Otel collector configuration for the given component model.
// All added component and pipelines names are prefixed with OtelNamePrefix.
// Unsupported components are quietly ignored.
func GetOtelConfig(
model *component.Model,
info info.Agent,
beatMonitoringConfigGetter BeatMonitoringConfigGetter,
) (*confmap.Conf, error) {
components := getSupportedComponents(model)
if len(components) == 0 {
return nil, nil
}
otelConfig := confmap.New() // base config, nothing here for now
for _, comp := range components {
componentConfig, compErr := getCollectorConfigForComponent(comp, info, beatMonitoringConfigGetter)
if compErr != nil {
return nil, compErr
}
// the assumption here is that each component will define its own receivers, and the shared exporters
// will be merged
mergeErr := otelConfig.Merge(componentConfig)
if mergeErr != nil {
return nil, fmt.Errorf("error merging otel config for component %s: %w", comp.ID, mergeErr)
}
}
return otelConfig, nil
}
// IsComponentOtelSupported checks if the given component can be run in an Otel Collector.
func IsComponentOtelSupported(comp *component.Component) bool {
return slices.Contains(OtelSupportedOutputTypes, comp.OutputType) &&
slices.Contains(OtelSupportedInputTypes, comp.InputType)
}
// getSupportedComponents returns components from the given model that can be run in an Otel Collector.
func getSupportedComponents(model *component.Model) []*component.Component {
var supportedComponents []*component.Component
for _, comp := range model.Components {
if IsComponentOtelSupported(&comp) {
supportedComponents = append(supportedComponents, &comp)
}
}
return supportedComponents
}
// getPipelineID returns the pipeline id for the given component.
func getPipelineID(comp *component.Component) (pipeline.ID, error) {
signal, err := getSignalForComponent(comp)
if err != nil {
return pipeline.ID{}, err
}
pipelineName := fmt.Sprintf("%s%s", OtelNamePrefix, comp.ID)
return pipeline.NewIDWithName(signal, pipelineName), nil
}
// getReceiverID returns the receiver id for the given unit and exporter type.
func getReceiverID(receiverType otelcomponent.Type, unitID string) otelcomponent.ID {
receiverName := fmt.Sprintf("%s%s", OtelNamePrefix, unitID)
return otelcomponent.NewIDWithName(receiverType, receiverName)
}
// getExporterID returns the exporter id for the given exporter type and output name.
func getExporterID(exporterType otelcomponent.Type, outputName string) otelcomponent.ID {
exporterName := fmt.Sprintf("%s%s", OtelNamePrefix, outputName)
return otelcomponent.NewIDWithName(exporterType, exporterName)
}
// getCollectorConfigForComponent returns the Otel collector config required to run the given component.
// This function returns a full, valid configuration that can then be merged with configurations for other components.
func getCollectorConfigForComponent(
comp *component.Component,
info info.Agent,
beatMonitoringConfigGetter BeatMonitoringConfigGetter,
) (*confmap.Conf, error) {
exportersConfig, outputQueueConfig, err := getExportersConfigForComponent(comp)
if err != nil {
return nil, err
}
receiversConfig, err := getReceiversConfigForComponent(comp, info, outputQueueConfig, beatMonitoringConfigGetter)
if err != nil {
return nil, err
}
pipelineID, err := getPipelineID(comp)
if err != nil {
return nil, err
}
pipelinesConfig := map[string]any{
pipelineID.String(): map[string][]string{
"exporters": maps.Keys(exportersConfig),
"receivers": maps.Keys(receiversConfig),
},
}
fullConfig := map[string]any{
"receivers": receiversConfig,
"exporters": exportersConfig,
"service": map[string]any{
"pipelines": pipelinesConfig,
},
}
return confmap.NewFromStringMap(fullConfig), nil
}
// getReceiversConfigForComponent returns the receivers configuration for a component. Usually this will be a single
// receiver, but in principle it could be more.
func getReceiversConfigForComponent(
comp *component.Component,
info info.Agent,
outputQueueConfig map[string]any,
beatMonitoringConfigGetter BeatMonitoringConfigGetter,
) (map[string]any, error) {
receiverType, err := getReceiverTypeForComponent(comp)
if err != nil {
return nil, err
}
// this is necessary to convert policy config format to beat config format
defaultDataStreamType, err := getDefaultDatastreamTypeForComponent(comp)
if err != nil {
return nil, err
}
// get inputs for all the units
// we run a single receiver for each component to mirror what beats processes do
var inputs []map[string]any
for _, unit := range comp.Units {
if unit.Type == client.UnitTypeInput {
unitInputs, err := getInputsForUnit(unit, info, defaultDataStreamType, comp.InputType)
if err != nil {
return nil, err
}
inputs = append(inputs, unitInputs...)
}
}
receiverId := getReceiverID(receiverType, comp.ID)
// Beat config inside a beat receiver is nested under an additional key. Not sure if this simple translation is
// always safe. We should either ensure this is always the case, or have an explicit mapping.
beatName := strings.TrimSuffix(receiverType.String(), "receiver")
beatDataPath := filepath.Join(paths.Run(), comp.ID)
binaryName := getBeatNameForComponent(comp)
dataset := fmt.Sprintf("elastic_agent.%s", strings.ReplaceAll(strings.ReplaceAll(binaryName, "-", "_"), "/", "_"))
receiverConfig := map[string]any{
// the output needs to be otelconsumer
"output": map[string]any{
"otelconsumer": map[string]any{},
},
// just like we do for beats processes, each receiver needs its own data path
"path": map[string]any{
"data": beatDataPath,
},
// adds additional context on logs emitted by beatreceivers to uniquely identify per component logs
"logging": map[string]any{
"with_fields": map[string]any{
"component": map[string]interface{}{
"id": comp.ID,
"binary": binaryName,
"dataset": dataset,
"type": comp.InputType,
},
"log": map[string]interface{}{
"source": comp.ID,
},
},
},
}
switch beatName {
case "filebeat":
receiverConfig[beatName] = map[string]any{
"inputs": inputs,
}
case "metricbeat":
receiverConfig[beatName] = map[string]any{
"modules": inputs,
}
}
// add the output queue config if present
if outputQueueConfig != nil {
receiverConfig["queue"] = outputQueueConfig
}
// add monitoring config if necessary
monitoringConfig := beatMonitoringConfigGetter(comp.ID, beatName)
koanfmaps.Merge(monitoringConfig, receiverConfig)
return map[string]any{
receiverId.String(): receiverConfig,
}, nil
}
// getReceiversConfigForComponent returns the exporters configuration and queue settings for a component. Usually this will be a single
// exporter, but in principle it could be more.
func getExportersConfigForComponent(comp *component.Component) (exporterCfg map[string]any, queueCfg map[string]any, err error) {
exportersConfig := map[string]any{}
exporterType, err := getExporterTypeForComponent(comp)
if err != nil {
return nil, nil, err
}
var queueSettings map[string]any
for _, unit := range comp.Units {
if unit.Type == client.UnitTypeOutput {
var unitExportersConfig map[string]any
unitExportersConfig, queueSettings, err = unitToExporterConfig(unit, exporterType, comp.InputType)
if err != nil {
return nil, nil, err
}
for k, v := range unitExportersConfig {
exportersConfig[k] = v
}
}
}
return exportersConfig, queueSettings, nil
}
// getBeatNameForComponent returns the beat binary name that would be used to run this component.
func getBeatNameForComponent(comp *component.Component) string {
// TODO: Add this information directly to the spec?
if comp.InputSpec == nil || comp.InputSpec.BinaryName != "agentbeat" {
return ""
}
return comp.InputSpec.Spec.Command.Args[0]
}
// getSignalForComponent returns the otel signal for the given component. Currently, this is always logs, even for
// metricbeat.
func getSignalForComponent(comp *component.Component) (pipeline.Signal, error) {
beatName := getBeatNameForComponent(comp)
switch beatName {
case "filebeat", "metricbeat":
return pipeline.SignalLogs, nil
default:
return pipeline.Signal{}, fmt.Errorf("unknown otel signal for input type: %s", comp.InputType)
}
}
// getReceiverTypeForComponent returns the receiver type for the given component.
func getReceiverTypeForComponent(comp *component.Component) (otelcomponent.Type, error) {
beatName := getBeatNameForComponent(comp)
switch beatName {
case "filebeat":
return otelcomponent.MustNewType(fbreceiver.Name), nil
case "metricbeat":
return otelcomponent.MustNewType(mbreceiver.Name), nil
default:
return otelcomponent.Type{}, fmt.Errorf("unknown otel receiver type for input type: %s", comp.InputType)
}
}
// getExporterTypeForComponent returns the exporter type for the given component.
func getExporterTypeForComponent(comp *component.Component) (otelcomponent.Type, error) {
switch comp.OutputType {
case "elasticsearch":
return otelcomponent.MustNewType("elasticsearch"), nil
default:
return otelcomponent.Type{}, fmt.Errorf("unknown otel exporter type for output type: %s", comp.OutputType)
}
}
// unitToExporterConfig translates a component.Unit to return an otel exporter configuration and output queue settings
func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, inputType string) (exportersCfg map[string]any, queueSettings map[string]any, err error) {
if unit.Type == client.UnitTypeInput {
return nil, nil, fmt.Errorf("unit type is an input, expected output: %v", unit)
}
configTranslationFunc, ok := configTranslationFuncForExporter[exporterType]
if !ok {
return nil, nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType)
}
// we'd like to use the same exporter for all outputs with the same name, so we parse out the name for the unit id
// these will be deduplicated by the configuration merging process at the end
outputName := strings.TrimPrefix(unit.ID, inputType+"-") // TODO: Use a more structured approach here
exporterId := getExporterID(exporterType, outputName)
// translate the configuration
unitConfigMap := unit.Config.GetSource().AsMap() // this is what beats do in libbeat/management/generate.go
outputCfgC, err := config.NewConfigFrom(unitConfigMap)
if err != nil {
return nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err)
}
// Config translation function can mutate queue settings defined under output config
exporterConfig, err := configTranslationFunc(outputCfgC)
if err != nil {
return nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err)
}
exportersCfg = map[string]any{
exporterId.String(): exporterConfig,
}
// If output config contains queue settings defined by user/preset field, it should be promoted to the receiver section
if ok := outputCfgC.HasField("queue"); ok {
err := outputCfgC.Unpack(&queueSettings)
if err != nil {
return nil, nil, fmt.Errorf("error unpacking queue settings for output: %s, unit: %s, error: %w", outputName, unit.ID, err)
}
if queue, ok := queueSettings["queue"].(map[string]any); ok {
queueSettings = queue
}
}
return exportersCfg, queueSettings, nil
}
// getInputsForUnit returns the beat inputs for a unit. These can directly be plugged into a beats receiver config.
// It mainly calls a conversion function from the control protocol client.
func getInputsForUnit(unit component.Unit, info info.Agent, defaultDataStreamType string, inputType string) ([]map[string]any, error) {
agentInfo := &client.AgentInfo{
ID: info.AgentID(),
Version: info.Version(),
Snapshot: info.Snapshot(),
ManagedMode: runtime.ProtoAgentMode(info),
Unprivileged: info.Unprivileged(),
}
inputs, err := management.CreateInputsFromStreams(unit.Config, defaultDataStreamType, agentInfo)
if err != nil {
return nil, err
}
// Add the type to each input. CreateInputsFromStreams doesn't do this, each beat does it on its own in a transform
// function. For filebeat, see: https://github.com/elastic/beats/blob/main/x-pack/filebeat/cmd/agent.go
for _, input := range inputs {
// If inputType contains /metrics, use modules to create inputs
if strings.Contains(inputType, "/metrics") {
input["module"] = strings.TrimSuffix(inputType, "/metrics")
} else if _, ok := input["type"]; !ok {
input["type"] = inputType
}
}
return inputs, nil
}
// getDefaultDatastreamTypeForComponent returns the default datastream type for a given component.
// This is needed to translate from the agent policy config format to the beats config format.
func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, error) {
beatName := getBeatNameForComponent(comp)
switch beatName {
case "filebeat":
return "logs", nil
case "metricbeat":
return "metrics", nil
default:
return "", fmt.Errorf("input type not supported by Otel: %s", comp.InputType)
}
}
// translateEsOutputToExporter translates an elasticsearch output configuration to an elasticsearch exporter configuration.
func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) {
esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg)
if err != nil {
return nil, err
}
// we want to use dynamic indexing
esConfig["logs_index"] = "" // needs to be empty for logs_dynamic_index
esConfig["logs_dynamic_index"] = map[string]any{"enabled": true}
// we also want to use dynamic log ids
esConfig["logs_dynamic_id"] = map[string]any{"enabled": true}
// for compatibility with beats, we want bodymap mapping
esConfig["mapping"] = map[string]any{"mode": "bodymap"}
return esConfig, nil
}