processor/schemaprocessor/processor.go (174 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor" import ( "context" "errors" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" ) type schemaProcessor struct { telemetry component.TelemetrySettings config *Config log *zap.Logger manager translation.Manager } func newSchemaProcessor(_ context.Context, conf component.Config, set processor.Settings) (*schemaProcessor, error) { cfg, ok := conf.(*Config) if !ok { return nil, errors.New("invalid configuration provided") } m, err := translation.NewManager( cfg.Targets, set.Logger.Named("schema-manager"), ) if err != nil { return nil, err } return &schemaProcessor{ config: cfg, telemetry: set.TelemetrySettings, log: set.Logger, manager: m, }, nil } func (t schemaProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { for rt := 0; rt < ld.ResourceLogs().Len(); rt++ { rLogs := ld.ResourceLogs().At(rt) resourceSchemaURL := rLogs.SchemaUrl() if resourceSchemaURL != "" { t.log.Debug("requesting translation for resourceSchemaURL", zap.String("resourceSchemaURL", resourceSchemaURL)) tr, err := t.manager. RequestTranslation(ctx, resourceSchemaURL) if err != nil { t.log.Error("failed to request translation", zap.Error(err)) return ld, err } err = tr.ApplyAllResourceChanges(rLogs, resourceSchemaURL) if err != nil { t.log.Error("failed to apply resource changes", zap.Error(err)) return ld, err } } for ss := 0; ss < rLogs.ScopeLogs().Len(); ss++ { logs := rLogs.ScopeLogs().At(ss) logsSchemaURL := logs.SchemaUrl() if logsSchemaURL == "" { logsSchemaURL = resourceSchemaURL } if logsSchemaURL == "" { continue } tr, err := t.manager. RequestTranslation(ctx, logsSchemaURL) if err != nil { t.log.Error("failed to request translation", zap.Error(err)) continue } err = tr.ApplyScopeLogChanges(logs, logsSchemaURL) if err != nil { t.log.Error("failed to apply scope log changes", zap.Error(err)) } } } return ld, nil } func (t schemaProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { for mt := 0; mt < md.ResourceMetrics().Len(); mt++ { rMetric := md.ResourceMetrics().At(mt) resourceSchemaURL := rMetric.SchemaUrl() if resourceSchemaURL != "" { t.log.Debug("requesting translation for resourceSchemaURL", zap.String("resourceSchemaURL", resourceSchemaURL)) tr, err := t.manager.RequestTranslation(context.Background(), resourceSchemaURL) if err != nil { t.log.Error("failed to request translation", zap.Error(err)) return md, err } err = tr.ApplyAllResourceChanges(rMetric, resourceSchemaURL) if err != nil { t.log.Error("failed to apply resource changes", zap.Error(err)) return md, err } } for sm := 0; sm < rMetric.ScopeMetrics().Len(); sm++ { metric := rMetric.ScopeMetrics().At(sm) metricSchemaURL := metric.SchemaUrl() if metricSchemaURL == "" { metricSchemaURL = resourceSchemaURL } if metricSchemaURL == "" { continue } tr, err := t.manager. RequestTranslation(ctx, metricSchemaURL) if err != nil { t.log.Error("failed to request translation", zap.Error(err)) return md, err } err = tr.ApplyScopeMetricChanges(metric, metricSchemaURL) if err != nil { t.log.Error("failed to apply scope metric changes", zap.Error(err)) return md, err } } } return md, nil } func (t schemaProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { for rt := 0; rt < td.ResourceSpans().Len(); rt++ { rTrace := td.ResourceSpans().At(rt) resourceSchemaURL := rTrace.SchemaUrl() if resourceSchemaURL != "" { t.log.Debug("requesting translation for resourceSchemaURL", zap.String("resourceSchemaURL", resourceSchemaURL)) tr, err := t.manager. RequestTranslation(ctx, resourceSchemaURL) if err != nil { t.log.Error("failed to request translation", zap.Error(err)) return td, err } err = tr.ApplyAllResourceChanges(rTrace, resourceSchemaURL) if err != nil { t.log.Error("failed to apply resource changes", zap.Error(err)) return td, err } } for ss := 0; ss < rTrace.ScopeSpans().Len(); ss++ { span := rTrace.ScopeSpans().At(ss) spanSchemaURL := span.SchemaUrl() if spanSchemaURL == "" { spanSchemaURL = resourceSchemaURL } if spanSchemaURL == "" { continue } tr, err := t.manager. RequestTranslation(ctx, spanSchemaURL) if err != nil { t.log.Error("failed to request translation", zap.Error(err)) continue } err = tr.ApplyScopeSpanChanges(span, spanSchemaURL) if err != nil { t.log.Error("failed to apply scope span changes", zap.Error(err)) } } } return td, nil } // start will add HTTP provider to the manager and prefetch schemas func (t *schemaProcessor) start(ctx context.Context, host component.Host) error { client, err := t.config.ToClient(ctx, host, t.telemetry) if err != nil { return err } t.manager.AddProvider(translation.NewHTTPProvider(client)) go func(ctx context.Context) { for _, schemaURL := range t.config.Prefetch { t.log.Info("prefetching schema", zap.String("url", schemaURL)) _, _ = t.manager.RequestTranslation(ctx, schemaURL) } }(ctx) return nil }