translator/translate/otel/translate_otel.go (165 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package otel
import (
"errors"
"fmt"
"log"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/telemetry"
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"
"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/entitystore"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/server"
pipelinetranslator "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/applicationsignals"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/containerinsights"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/containerinsightsjmx"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/emf_logs"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/host"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/jmx"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/nop"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/prometheus"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/pipeline/xray"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)
var registry = common.NewTranslatorMap[*common.ComponentTranslators, pipeline.ID]()
func RegisterPipeline(translators ...pipelinetranslator.Translator) {
for _, translator := range translators {
registry.Set(translator)
}
}
// Translate converts a JSON config into an OTEL config.
func Translate(jsonConfig interface{}, os string) (*otelcol.Config, error) {
m, ok := jsonConfig.(map[string]interface{})
if !ok {
return nil, errors.New("invalid json config")
}
conf := confmap.NewFromStringMap(m)
if conf.IsSet("csm") {
log.Printf("W! CSM has already been deprecated")
}
translators := common.NewTranslatorMap[*common.ComponentTranslators, pipeline.ID]()
metricsHostTranslators, err := host.NewTranslators(conf, host.MetricsKey, os)
if err != nil {
return nil, err
}
translators.Merge(metricsHostTranslators)
logsHostTranslators, err := host.NewTranslators(conf, host.LogsKey, os)
if err != nil {
return nil, err
}
translators.Merge(logsHostTranslators)
containerInsightsTranslators := containerinsights.NewTranslators(conf)
translators.Merge(containerInsightsTranslators)
translators.Set(applicationsignals.NewTranslator(pipeline.SignalTraces))
translators.Set(applicationsignals.NewTranslator(pipeline.SignalMetrics))
translators.Merge(prometheus.NewTranslators(conf))
translators.Set(emf_logs.NewTranslator())
translators.Set(xray.NewTranslator())
translators.Set(containerinsightsjmx.NewTranslator())
translators.Merge(jmx.NewTranslators(conf))
translators.Merge(registry)
pipelines, err := pipelinetranslator.NewTranslator(translators).Translate(conf)
if err != nil {
translators.Set(nop.NewTranslator())
pipelines, err = pipelinetranslator.NewTranslator(translators).Translate(conf)
if err != nil {
return nil, err
}
}
// ECS is not in scope for entity association, so we only add the entity store in non ECS platforms
if !ecsutil.GetECSUtilSingleton().IsECS() {
pipelines.Translators.Extensions.Set(entitystore.NewTranslator())
}
if context.CurrentContext().KubernetesMode() != "" {
pipelines.Translators.Extensions.Set(server.NewTranslator())
}
cfg := &otelcol.Config{
Receivers: map[component.ID]component.Config{},
Exporters: map[component.ID]component.Config{},
Processors: map[component.ID]component.Config{},
Extensions: map[component.ID]component.Config{},
Service: service.Config{
Telemetry: telemetry.Config{
Logs: getLoggingConfig(conf),
Metrics: telemetry.MetricsConfig{Level: configtelemetry.LevelNone},
Traces: telemetry.TracesConfig{Level: configtelemetry.LevelNone},
},
Pipelines: pipelines.Pipelines,
Extensions: pipelines.Translators.Extensions.Keys(),
},
}
if err = build(conf, cfg, pipelines.Translators); err != nil {
return nil, fmt.Errorf("unable to build components in pipeline: %w", err)
}
if err = cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid otel config: %w", err)
}
return cfg, nil
}
// parseAgentLogLevel returns the logging level from the JSON config, or the
// default value.
func parseAgentLogLevel(conf *confmap.Conf) zapcore.Level {
// "quiet" takes precedence over "debug" in Telegraf.
v, _ := common.GetBool(conf, common.ConfigKey("agent", "quiet"))
if v {
return zapcore.ErrorLevel
}
v, _ = common.GetBool(conf, common.ConfigKey("agent", "debug"))
if v {
return zapcore.DebugLevel
}
return zapcore.InfoLevel
}
// getLoggingConfig uses the given JSON config to determine the correct
// logging configuration that should go in the YAML.
func getLoggingConfig(conf *confmap.Conf) telemetry.LogsConfig {
var outputPaths []string
filename := context.CurrentContext().GetAgentLogFile()
// A slice with an empty string causes OTEL issues, so avoid it.
if filename != "" {
outputPaths = []string{filename}
}
logLevel := parseAgentLogLevel(conf)
return telemetry.LogsConfig{
OutputPaths: outputPaths,
Level: logLevel,
Encoding: common.Console,
// enabled by default with 10 second tick
Sampling: &telemetry.LogsSamplingConfig{
Enabled: true,
Initial: 2,
Thereafter: 500,
Tick: 10 * time.Second,
},
}
}
// build uses the pipelines and extensions defined in the config to build the components.
func build(conf *confmap.Conf, cfg *otelcol.Config, translators common.ComponentTranslators) error {
errs := buildComponents(conf, cfg.Service.Extensions, cfg.Extensions, translators.Extensions.Get)
for _, p := range cfg.Service.Pipelines {
errs = multierr.Append(errs, buildComponents(conf, p.Receivers, cfg.Receivers, translators.Receivers.Get))
errs = multierr.Append(errs, buildComponents(conf, p.Processors, cfg.Processors, translators.Processors.Get))
errs = multierr.Append(errs, buildComponents(conf, p.Exporters, cfg.Exporters, translators.Exporters.Get))
}
return errs
}
// buildComponents attempts to translate a component for each ID in the set.
func buildComponents[C component.Config, ID common.TranslatorID](
conf *confmap.Conf,
ids []ID,
components map[ID]C,
getTranslator func(ID) (common.Translator[C, ID], bool),
) error {
var errs error
for _, id := range ids {
translator, ok := getTranslator(id)
if !ok {
errs = multierr.Append(errs, fmt.Errorf("missing translator for %v", id.Name()))
continue
}
cfg, err := translator.Translate(conf)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
components[id] = cfg
}
return errs
}