processor/datadogsemanticsprocessor/processor.go (147 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package datadogsemanticsprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogsemanticsprocessor" import ( "context" "errors" "fmt" "strings" "github.com/DataDog/datadog-agent/pkg/trace/traceutil" "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" ) func insertAttrIfMissing(sattr pcommon.Map, key string, value any) (err error) { if _, ok := sattr.Get(key); !ok { switch v := value.(type) { case string: sattr.PutStr(key, v) case int64: sattr.PutInt(key, v) default: err = errors.New("unsupported value type") } } return } func (tp *tracesProcessor) processTraces(ctx context.Context, td ptrace.Traces) (output ptrace.Traces, err error) { rspans := td.ResourceSpans() for i := 0; i < rspans.Len(); i++ { rspan := rspans.At(i) otelres := rspan.Resource() rattr := otelres.Attributes() for j := 0; j < rspan.ScopeSpans().Len(); j++ { libspans := rspan.ScopeSpans().At(j) for k := 0; k < libspans.Spans().Len(); k++ { otelspan := libspans.Spans().At(k) sattr := otelspan.Attributes() if tp.overrideIncomingDatadogFields { sattr.RemoveIf(func(k string, _ pcommon.Value) bool { return strings.HasPrefix(k, "datadog.") }) if ddHostname, ok := rattr.Get("datadog.host.name"); ok && ddHostname.AsString() == "" { rattr.Remove("datadog.host.name") } } if err = insertAttrIfMissing(sattr, "datadog.service", traceutil.GetOTelService(otelres, true)); err != nil { return ptrace.Traces{}, err } if err = insertAttrIfMissing(sattr, "datadog.name", traceutil.GetOTelOperationNameV2(otelspan)); err != nil { return ptrace.Traces{}, err } if err = insertAttrIfMissing(sattr, "datadog.resource", traceutil.GetOTelResourceV2(otelspan, otelres)); err != nil { return ptrace.Traces{}, err } if err = insertAttrIfMissing(sattr, "datadog.type", traceutil.GetOTelSpanType(otelspan, otelres)); err != nil { return ptrace.Traces{}, err } if src, ok := tp.attrsTranslator.ResourceToSource(ctx, otelres, traceutil.SignalTypeSet, nil); ok && src.Kind == source.HostnameKind { if err = insertAttrIfMissing(otelres.Attributes(), "datadog.host.name", src.Identifier); err != nil { return ptrace.Traces{}, err } } spanKind := otelspan.Kind() if err = insertAttrIfMissing(sattr, "datadog.span.kind", traceutil.OTelSpanKindName(spanKind)); err != nil { return ptrace.Traces{}, err } if env := traceutil.GetOTelEnv(otelres); env != "" { if err = insertAttrIfMissing(sattr, "datadog.env", env); err != nil { return ptrace.Traces{}, err } } if serviceVersion, ok := otelres.Attributes().Get(semconv.AttributeServiceVersion); ok { if err = insertAttrIfMissing(sattr, "datadog.version", serviceVersion.AsString()); err != nil { return ptrace.Traces{}, err } } metaMap := make(map[string]string) code := traceutil.GetOTelStatusCode(otelspan) if code != 0 { if err = insertAttrIfMissing(sattr, "datadog.http_status_code", fmt.Sprintf("%d", code)); err != nil { return ptrace.Traces{}, err } } ddError := int64(status2Error(otelspan.Status(), otelspan.Events(), metaMap)) if err = insertAttrIfMissing(sattr, "datadog.error", ddError); err != nil { return ptrace.Traces{}, err } if metaMap["error.msg"] != "" { if err = insertAttrIfMissing(sattr, "datadog.error.msg", metaMap["error.msg"]); err != nil { return ptrace.Traces{}, err } } if metaMap["error.type"] != "" { if err = insertAttrIfMissing(sattr, "datadog.error.type", metaMap["error.type"]); err != nil { return ptrace.Traces{}, err } } if metaMap["error.stack"] != "" { if err = insertAttrIfMissing(sattr, "datadog.error.stack", metaMap["error.stack"]); err != nil { return ptrace.Traces{}, err } } } } } return td, err } // TODO import this from datadog-agent pending https://github.com/DataDog/datadog-agent/pull/33753 // Status2Error... func status2Error(status ptrace.Status, events ptrace.SpanEventSlice, metaMap map[string]string) int32 { if status.Code() != ptrace.StatusCodeError { return 0 } for i := 0; i < events.Len(); i++ { e := events.At(i) if strings.ToLower(e.Name()) != "exception" { continue } attrs := e.Attributes() if v, ok := attrs.Get(semconv.AttributeExceptionMessage); ok { metaMap["error.msg"] = v.AsString() } if v, ok := attrs.Get(semconv.AttributeExceptionType); ok { metaMap["error.type"] = v.AsString() } if v, ok := attrs.Get(semconv.AttributeExceptionStacktrace); ok { metaMap["error.stack"] = v.AsString() } } if _, ok := metaMap["error.msg"]; !ok { // no error message was extracted, find alternatives if status.Message() != "" { // use the status message metaMap["error.msg"] = status.Message() } else if _, httpcode := getFirstFromMap(metaMap, "http.response.status_code", "http.status_code"); httpcode != "" { // `http.status_code` was renamed to `http.response.status_code` in the HTTP stabilization from v1.23. // See https://opentelemetry.io/docs/specs/semconv/http/migration-guide/#summary-of-changes // http.status_text was removed in spec v0.7.0 (https://github.com/open-telemetry/opentelemetry-specification/pull/972) // TODO (OTEL-1791) Remove this and use a map from status code to status text. if httptext, ok := metaMap["http.status_text"]; ok { metaMap["error.msg"] = fmt.Sprintf("%s %s", httpcode, httptext) } else { metaMap["error.msg"] = httpcode } } } return 1 } // TODO remove once Status2Error is imported from datadog-agent func getFirstFromMap(m map[string]string, keys ...string) (string, string) { for _, key := range keys { if val := m[key]; val != "" { return key, val } } return "", "" }