input/otlp/traces.go (1,203 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Portions copied from OpenTelemetry Collector (contrib), from the
// elastic exporter.
//
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp
import (
"context"
"encoding/hex"
"fmt"
"math"
"net"
"net/url"
"slices"
"strconv"
"strings"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv12 "go.opentelemetry.io/collector/semconv/v1.12.0"
semconv16 "go.opentelemetry.io/collector/semconv/v1.16.0"
semconv18 "go.opentelemetry.io/collector/semconv/v1.18.0"
semconv25 "go.opentelemetry.io/collector/semconv/v1.25.0"
semconv "go.opentelemetry.io/collector/semconv/v1.27.0"
"google.golang.org/grpc/codes"
"github.com/elastic/apm-data/model/modelpb"
)
const (
keywordLength = 1024
dot = "."
underscore = "_"
outcomeSuccess = "success"
outcomeFailure = "failure"
outcomeUnknown = "unknown"
attributeHTTPPath = "http.path"
attributeDbElasticsearchClusterName = "db.elasticsearch.cluster.name"
attributeDataStreamDataset = "data_stream.dataset"
attributeDataStreamNamespace = "data_stream.namespace"
)
// ConsumeTracesResult contains the number of rejected spans and error message for partial success response.
type ConsumeTracesResult struct {
ErrorMessage string
RejectedSpans int64
}
// ConsumeTraces calls ConsumeTracesWithResult but ignores the result.
// It exists to satisfy the go.opentelemetry.io/collector/consumer.Traces interface.
func (c *Consumer) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
_, err := c.ConsumeTracesWithResult(ctx, traces)
return err
}
// ConsumeTracesWithResult consumes OpenTelemetry trace data,
// converting into Elastic APM events and reporting to the Elastic APM schema.
func (c *Consumer) ConsumeTracesWithResult(ctx context.Context, traces ptrace.Traces) (ConsumeTracesResult, error) {
if err := semAcquire(ctx, c.sem, 1); err != nil {
return ConsumeTracesResult{}, err
}
defer c.sem.Release(1)
receiveTimestamp := time.Now()
resourceSpans := traces.ResourceSpans()
batch := make(modelpb.Batch, 0, resourceSpans.Len())
for i := 0; i < resourceSpans.Len(); i++ {
c.convertResourceSpans(resourceSpans.At(i), receiveTimestamp, &batch)
}
if err := c.config.Processor.ProcessBatch(ctx, &batch); err != nil {
return ConsumeTracesResult{}, err
}
return ConsumeTracesResult{RejectedSpans: 0}, nil
}
func (c *Consumer) convertResourceSpans(
resourceSpans ptrace.ResourceSpans,
receiveTimestamp time.Time,
out *modelpb.Batch,
) {
baseEvent := modelpb.APMEvent{}
baseEvent.Event = &modelpb.Event{}
baseEvent.Event.Received = modelpb.FromTime(receiveTimestamp)
var timeDelta time.Duration
resource := resourceSpans.Resource()
translateResourceMetadata(resource, &baseEvent)
if exportTimestamp, ok := exportTimestamp(resource); ok {
timeDelta = receiveTimestamp.Sub(exportTimestamp)
}
scopeSpans := resourceSpans.ScopeSpans()
for i := 0; i < scopeSpans.Len(); i++ {
c.convertScopeSpans(scopeSpans.At(i), &baseEvent, timeDelta, out)
}
}
func (c *Consumer) convertScopeSpans(
in ptrace.ScopeSpans,
baseEvent *modelpb.APMEvent,
timeDelta time.Duration,
out *modelpb.Batch,
) {
otelSpans := in.Spans()
for i := 0; i < otelSpans.Len(); i++ {
c.convertSpan(otelSpans.At(i), in.Scope(), baseEvent, timeDelta, out)
}
}
func (c *Consumer) convertSpan(
otelSpan ptrace.Span,
otelLibrary pcommon.InstrumentationScope,
baseEvent *modelpb.APMEvent,
timeDelta time.Duration,
out *modelpb.Batch,
) {
root := otelSpan.ParentSpanID().IsEmpty()
var parentID string
if !root {
parentID = hexSpanID(otelSpan.ParentSpanID())
}
startTime := otelSpan.StartTimestamp().AsTime()
endTime := otelSpan.EndTimestamp().AsTime()
duration := endTime.Sub(startTime)
// Message consumption results in either a transaction or a span based
// on whether the consumption is active or passive. Otel spans
// currently do not have the metadata to make this distinction. For
// now, we assume that the majority of consumption is passive, and
// therefore start a transaction whenever span kind == consumer.
name := otelSpan.Name()
spanID := hexSpanID(otelSpan.SpanID())
representativeCount := getRepresentativeCountFromTracestateHeader(otelSpan.TraceState().AsRaw())
event := baseEvent.CloneVT()
translateScopeMetadata(otelLibrary, event)
initEventLabels(event)
event.Timestamp = modelpb.FromTime(startTime.Add(timeDelta))
if id := hexTraceID(otelSpan.TraceID()); id != "" {
event.Trace = &modelpb.Trace{}
event.Trace.Id = id
}
if event.Event == nil {
event.Event = &modelpb.Event{}
}
event.Event.Duration = uint64(duration)
event.Event.Outcome = spanStatusOutcome(otelSpan.Status())
if parentID != "" {
event.ParentId = parentID
}
if root || otelSpan.Kind() == ptrace.SpanKindServer || otelSpan.Kind() == ptrace.SpanKindConsumer {
event.Transaction = &modelpb.Transaction{}
event.Transaction.Id = spanID
event.Transaction.Name = name
event.Transaction.Sampled = true
event.Transaction.RepresentativeCount = representativeCount
if spanID != "" {
event.Span = &modelpb.Span{}
event.Span.Id = spanID
}
TranslateTransaction(otelSpan.Attributes(), otelSpan.Status(), otelLibrary, event)
} else {
event.Span = &modelpb.Span{}
event.Span.Id = spanID
event.Span.Name = name
event.Span.RepresentativeCount = representativeCount
TranslateSpan(otelSpan.Kind(), otelSpan.Attributes(), event)
}
translateSpanLinks(event, otelSpan.Links())
if len(event.Labels) == 0 {
event.Labels = nil
}
if len(event.NumericLabels) == 0 {
event.NumericLabels = nil
}
*out = append(*out, event)
events := otelSpan.Events()
event = event.CloneVT()
event.Labels = baseEvent.Labels // only copy common labels to span events
event.NumericLabels = baseEvent.NumericLabels // only copy common labels to span events
event.Event = &modelpb.Event{}
event.Event.Received = baseEvent.Event.Received // only copy event.received to span events
event.Destination = nil // don't set destination for span events
for i := 0; i < events.Len(); i++ {
*out = append(*out, c.convertSpanEvent(events.At(i), event, timeDelta))
}
}
// TranslateTransaction converts incoming otlp/otel trace data into the
// expected elasticsearch format.
func TranslateTransaction(
attributes pcommon.Map,
spanStatus ptrace.Status,
library pcommon.InstrumentationScope,
event *modelpb.APMEvent,
) {
isJaeger := strings.HasPrefix(event.Agent.Name, "Jaeger")
var (
netHostName string
netHostPort int
)
var (
httpScheme string
httpURL string
httpServerName string
httpHost string
http modelpb.HTTP
httpRequest modelpb.HTTPRequest
httpResponse modelpb.HTTPResponse
urlPath string
urlQuery string
)
var isHTTP, isRPC, isMessaging bool
var messagingQueueName string
var samplerType, samplerParam pcommon.Value
attributes.Range(func(kDots string, v pcommon.Value) bool {
if isJaeger {
switch kDots {
case "sampler.type":
samplerType = v
return true
case "sampler.param":
samplerParam = v
return true
}
}
k := replaceDots(kDots)
switch v.Type() {
case pcommon.ValueTypeSlice:
switch kDots {
case "elastic.profiler_stack_trace_ids":
var vSlice = v.Slice()
event.Transaction.ProfilerStackTraceIds = slices.Grow(event.Transaction.ProfilerStackTraceIds, vSlice.Len())
for i := 0; i < vSlice.Len(); i++ {
var idVal = vSlice.At(i)
if idVal.Type() == pcommon.ValueTypeStr {
event.Transaction.ProfilerStackTraceIds = append(event.Transaction.ProfilerStackTraceIds, idVal.Str())
}
}
default:
setLabel(k, event, v)
}
case pcommon.ValueTypeInt:
switch kDots {
case semconv25.AttributeHTTPStatusCode, semconv.AttributeHTTPResponseStatusCode:
isHTTP = true
httpResponse.StatusCode = uint32(v.Int())
http.Response = &httpResponse
case semconv25.AttributeNetPeerPort, semconv.AttributeClientPort:
if event.Source == nil {
event.Source = &modelpb.Source{}
}
event.Source.Port = uint32(v.Int())
case semconv25.AttributeNetHostPort, semconv.AttributeServerPort:
netHostPort = int(v.Int())
case semconv.AttributeRPCGRPCStatusCode:
isRPC = true
event.Transaction.Result = codes.Code(v.Int()).String()
default:
setLabel(k, event, v)
}
case pcommon.ValueTypeMap:
case pcommon.ValueTypeStr:
stringval := truncate(v.Str())
switch kDots {
// http.*
case semconv25.AttributeHTTPMethod, semconv.AttributeHTTPRequestMethod:
isHTTP = true
httpRequest.Method = stringval
http.Request = &httpRequest
case semconv25.AttributeHTTPURL, semconv.AttributeURLFull, semconv25.AttributeHTTPTarget, attributeHTTPPath:
isHTTP = true
httpURL = stringval
case semconv.AttributeURLPath:
isHTTP = true
urlPath = stringval
case semconv.AttributeURLQuery:
isHTTP = true
urlQuery = stringval
case semconv12.AttributeHTTPHost: //removed after 1.12 (stable)
isHTTP = true
httpHost = stringval
case semconv25.AttributeHTTPScheme, semconv.AttributeURLScheme:
isHTTP = true
httpScheme = stringval
case semconv25.AttributeHTTPStatusCode, semconv.AttributeHTTPResponseStatusCode:
if intv, err := strconv.Atoi(stringval); err == nil {
isHTTP = true
httpResponse.StatusCode = uint32(intv)
http.Response = &httpResponse
}
case "http.protocol":
if !strings.HasPrefix(stringval, "HTTP/") {
// Unexpected, store in labels for debugging.
modelpb.Labels(event.Labels).Set(k, stringval)
break
}
stringval = strings.TrimPrefix(stringval, "HTTP/")
fallthrough
case semconv25.AttributeHTTPFlavor: //removed after 1.25 (experimental)
isHTTP = true
http.Version = stringval
case semconv12.AttributeHTTPServerName: //removed after 1.12 (stable)
isHTTP = true
httpServerName = stringval
case semconv18.AttributeHTTPClientIP: //removed after 1.18 (stable)
if ip, err := modelpb.ParseIP(stringval); err == nil {
if event.Client == nil {
event.Client = &modelpb.Client{}
}
event.Client.Ip = ip
}
case semconv25.AttributeHTTPUserAgent, semconv.AttributeUserAgentOriginal:
if event.UserAgent == nil {
event.UserAgent = &modelpb.UserAgent{}
}
event.UserAgent.Original = stringval
// net.*
case semconv12.AttributeNetPeerIP, semconv25.AttributeNetSockPeerAddr:
if event.Source == nil {
event.Source = &modelpb.Source{}
}
if ip, err := modelpb.ParseIP(stringval); err == nil {
event.Source.Ip = ip
}
case semconv25.AttributeNetPeerName, semconv.AttributeClientAddress:
if event.Source == nil {
event.Source = &modelpb.Source{}
}
event.Source.Domain = stringval
case semconv25.AttributeNetHostName, semconv.AttributeServerAddress:
netHostName = stringval
case semconv.AttributeNetworkConnectionType:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Connection == nil {
event.Network.Connection = &modelpb.NetworkConnection{}
}
event.Network.Connection.Type = stringval
case semconv.AttributeNetworkConnectionSubtype:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Connection == nil {
event.Network.Connection = &modelpb.NetworkConnection{}
}
event.Network.Connection.Subtype = stringval
case semconv.AttributeNetworkCarrierMcc:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Carrier == nil {
event.Network.Carrier = &modelpb.NetworkCarrier{}
}
event.Network.Carrier.Mcc = stringval
case semconv.AttributeNetworkCarrierMnc:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Carrier == nil {
event.Network.Carrier = &modelpb.NetworkCarrier{}
}
event.Network.Carrier.Mnc = stringval
case semconv.AttributeNetworkCarrierName:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Carrier == nil {
event.Network.Carrier = &modelpb.NetworkCarrier{}
}
event.Network.Carrier.Name = stringval
case semconv.AttributeNetworkCarrierIcc:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Carrier == nil {
event.Network.Carrier = &modelpb.NetworkCarrier{}
}
event.Network.Carrier.Icc = stringval
// messaging.*
//
// messaging.destination is now called messaging.destination.name in the latest semconv
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging
// keep both of them for the backward compatibility
case semconv16.AttributeMessagingDestination, semconv.AttributeMessagingDestinationName, "message_bus.destination":
isMessaging = true
messagingQueueName = stringval
case semconv.AttributeMessagingSystem:
isMessaging = true
modelpb.Labels(event.Labels).Set(k, stringval)
case semconv25.AttributeMessagingOperation, semconv.AttributeMessagingOperationType, semconv.AttributeMessagingOperationName:
isMessaging = true
modelpb.Labels(event.Labels).Set(k, stringval)
// rpc.*
//
// TODO(axw) add RPC fieldset to ECS? Currently we drop these
// attributes, and rely on the operation name like we do with
// Elastic APM agents.
case semconv.AttributeRPCSystem:
isRPC = true
case semconv.AttributeRPCGRPCStatusCode:
isRPC = true
case semconv.AttributeRPCService:
case semconv.AttributeRPCMethod:
// miscellaneous
case "type":
event.Transaction.Type = stringval
case "session.id":
if event.Session == nil {
event.Session = &modelpb.Session{}
}
event.Session.Id = stringval
case semconv.AttributeServiceVersion:
// NOTE support for sending service.version as a span tag
// is deprecated, and will be removed in 8.0. Instrumentation
// should set this as a resource attribute (OTel) or tracer
// tag (Jaeger).
event.Service.Version = stringval
// data_stream.*
case attributeDataStreamDataset:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = sanitizeDataStreamDataset(stringval)
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = sanitizeDataStreamNamespace(stringval)
default:
modelpb.Labels(event.Labels).Set(k, stringval)
}
default:
setLabel(k, event, v)
}
return true
})
if event.Transaction.Type == "" {
switch {
case isMessaging:
event.Transaction.Type = "messaging"
case isHTTP, isRPC:
event.Transaction.Type = "request"
default:
event.Transaction.Type = "unknown"
}
}
if isHTTP {
if http.SizeVT() != 0 {
event.Http = &http
}
// Set outcome nad result from status code.
if statusCode := httpResponse.StatusCode; statusCode > 0 {
if event.Event.Outcome == outcomeUnknown {
event.Event.Outcome = serverHTTPStatusCodeOutcome(int(statusCode))
}
if event.Transaction.Result == "" {
event.Transaction.Result = httpStatusCodeResult(int(statusCode))
}
}
httpHost := httpHost
if httpHost == "" {
httpHost = httpServerName
if httpHost == "" {
httpHost = netHostName
if httpHost == "" {
httpHost = event.GetHost().GetHostname()
}
}
if httpHost != "" && netHostPort > 0 {
httpHost = net.JoinHostPort(httpHost, strconv.Itoa(netHostPort))
}
}
// Build a relative url from the UrlPath and UrlQuery.
httpURL := httpURL
if httpURL == "" && urlPath != "" {
httpURL = urlPath
if urlQuery != "" {
httpURL += "?" + urlQuery
}
}
// Build the modelpb.URL from http{URL,Host,Scheme}.
event.Url = modelpb.ParseURL(httpURL, httpHost, httpScheme)
}
if isMessaging {
// Overwrite existing event.Transaction.Message
event.Transaction.Message = nil
if messagingQueueName != "" {
event.Transaction.Message = &modelpb.Message{}
event.Transaction.Message.QueueName = messagingQueueName
}
}
if event.Client == nil && event.Source != nil {
event.Client = &modelpb.Client{}
event.Client.Ip = event.Source.Ip
event.Client.Port = event.Source.Port
event.Client.Domain = event.Source.Domain
}
if samplerType != (pcommon.Value{}) {
// The client has reported its sampling rate, so we can use it to extrapolate span metrics.
parseSamplerAttributes(samplerType, samplerParam, event)
}
if event.Transaction.Result == "" {
event.Transaction.Result = spanStatusResult(spanStatus)
}
// if outcome and result are still not assigned, assign success
if event.Event.Outcome == outcomeUnknown {
event.Event.Outcome = outcomeSuccess
if event.Transaction.Result == "" {
event.Transaction.Result = "Success"
}
}
}
// TranslateSpan converts incoming otlp/otel trace data into the
// expected elasticsearch format.
func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *modelpb.APMEvent) {
isJaeger := strings.HasPrefix(event.GetAgent().GetName(), "Jaeger")
var (
netPeerName string
netPeerIP string
netPeerPort int
)
var (
peerService string
peerAddress string
)
var (
httpURL string
httpHost string
httpTarget string
httpURLPath string
httpURLQuery string
httpScheme = "http"
)
var (
messageSystem string
messageOperation string
messageTempDestination bool
)
var (
rpcSystem string
rpcService string
)
var (
genAiSystem string
)
var http modelpb.HTTP
var httpRequest modelpb.HTTPRequest
var httpResponse modelpb.HTTPResponse
var message modelpb.Message
var db modelpb.DB
var destinationService modelpb.DestinationService
var serviceTarget modelpb.ServiceTarget
var isHTTP, isDatabase, isRPC, isMessaging, isGenAi bool
var samplerType, samplerParam pcommon.Value
attributes.Range(func(kDots string, v pcommon.Value) bool {
if isJaeger {
switch kDots {
case "sampler.type":
samplerType = v
return true
case "sampler.param":
samplerParam = v
return true
}
}
k := replaceDots(kDots)
switch v.Type() {
case pcommon.ValueTypeBool:
switch kDots {
case semconv16.AttributeMessagingTempDestination, semconv.AttributeMessagingDestinationTemporary:
messageTempDestination = v.Bool()
fallthrough
default:
setLabel(k, event, v)
}
case pcommon.ValueTypeInt:
switch kDots {
case semconv25.AttributeHTTPStatusCode, semconv.AttributeHTTPResponseStatusCode:
httpResponse.StatusCode = uint32(v.Int())
http.Response = &httpResponse
isHTTP = true
case semconv25.AttributeNetPeerPort, semconv.AttributeServerPort, "peer.port":
netPeerPort = int(v.Int())
case semconv.AttributeRPCGRPCStatusCode:
rpcSystem = "grpc"
isRPC = true
default:
setLabel(k, event, v)
}
case pcommon.ValueTypeStr:
stringval := truncate(v.Str())
switch kDots {
// http.*
case semconv12.AttributeHTTPHost: //removed after 1.12 (stable)
httpHost = stringval
isHTTP = true
case semconv25.AttributeHTTPScheme, semconv.AttributeURLScheme:
httpScheme = stringval
isHTTP = true
case semconv25.AttributeHTTPTarget:
httpTarget = stringval
isHTTP = true
case semconv.AttributeURLPath:
httpURLPath = stringval
isHTTP = true
case semconv.AttributeURLQuery:
httpURLQuery = stringval
isHTTP = true
case semconv25.AttributeHTTPURL, semconv.AttributeURLFull:
httpURL = stringval
isHTTP = true
case semconv25.AttributeHTTPMethod, semconv.AttributeHTTPRequestMethod:
httpRequest.Method = stringval
http.Request = &httpRequest
isHTTP = true
// db.*
case "sql.query":
if db.Type == "" {
db.Type = "sql"
}
fallthrough
case semconv25.AttributeDBStatement, semconv.AttributeDBQueryText:
// Statement should not be truncated, use original string value.
db.Statement = v.Str()
isDatabase = true
case semconv25.AttributeDBName, semconv.AttributeDBNamespace, "db.instance", attributeDbElasticsearchClusterName:
db.Instance = stringval
isDatabase = true
case semconv.AttributeDBSystem, "db.type":
db.Type = stringval
isDatabase = true
case semconv25.AttributeDBUser: //removed after 1.25 (experimental)
db.UserName = stringval
isDatabase = true
// net.*
case semconv25.AttributeNetPeerName, "peer.hostname", semconv.AttributeServerAddress:
netPeerName = stringval
case semconv12.AttributeNetPeerIP, semconv25.AttributeNetSockPeerAddr, semconv.AttributeNetworkPeerAddress, "peer.ipv4", "peer.ipv6":
netPeerIP = stringval
case "peer.address":
peerAddress = stringval
case semconv.AttributeNetworkConnectionType:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Connection == nil {
event.Network.Connection = &modelpb.NetworkConnection{}
}
event.Network.Connection.Type = stringval
case semconv.AttributeNetworkConnectionSubtype:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Connection == nil {
event.Network.Connection = &modelpb.NetworkConnection{}
}
event.Network.Connection.Subtype = stringval
case semconv.AttributeNetworkCarrierMcc:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Carrier == nil {
event.Network.Carrier = &modelpb.NetworkCarrier{}
}
event.Network.Carrier.Mcc = stringval
case semconv.AttributeNetworkCarrierMnc:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Carrier == nil {
event.Network.Carrier = &modelpb.NetworkCarrier{}
}
event.Network.Carrier.Mnc = stringval
case semconv.AttributeNetworkCarrierName:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Carrier == nil {
event.Network.Carrier = &modelpb.NetworkCarrier{}
}
event.Network.Carrier.Name = stringval
case semconv.AttributeNetworkCarrierIcc:
if event.Network == nil {
event.Network = &modelpb.Network{}
}
if event.Network.Carrier == nil {
event.Network.Carrier = &modelpb.NetworkCarrier{}
}
event.Network.Carrier.Icc = stringval
// session.*
case "session.id":
if event.Session == nil {
event.Session = &modelpb.Session{}
}
event.Session.Id = stringval
// messaging.*
//
// messaging.destination is now called messaging.destination.name in the latest semconv
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging
// keep both of them for the backward compatibility
case semconv16.AttributeMessagingDestination, semconv.AttributeMessagingDestinationName, "message_bus.destination":
message.QueueName = stringval
isMessaging = true
case semconv25.AttributeMessagingOperation, semconv.AttributeMessagingOperationType:
messageOperation = stringval
isMessaging = true
case semconv.AttributeMessagingOperationName:
modelpb.Labels(event.Labels).Set(k, stringval)
isMessaging = true
case semconv.AttributeMessagingSystem:
messageSystem = stringval
isMessaging = true
// rpc.*
//
// TODO(axw) add RPC fieldset to ECS? Currently we drop these
// attributes, and rely on the operation name and span type/subtype
// like we do with Elastic APM agents.
case semconv.AttributeRPCSystem:
rpcSystem = stringval
isRPC = true
case semconv.AttributeRPCService:
rpcService = stringval
isRPC = true
case semconv.AttributeRPCGRPCStatusCode:
rpcSystem = "grpc"
isRPC = true
case semconv.AttributeRPCMethod:
case semconv.AttributeCodeStacktrace:
if event.Code == nil {
event.Code = &modelpb.Code{}
}
// stacktrace is expected to be large thus un-truncated value is needed
event.Code.Stacktrace = v.Str()
// gen_ai.*
case semconv.AttributeGenAiSystem:
genAiSystem = stringval
isGenAi = true
// miscellaneous
case "span.kind": // filter out
case semconv.AttributePeerService:
peerService = stringval
// data_stream.*
case attributeDataStreamDataset:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = sanitizeDataStreamDataset(stringval)
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = sanitizeDataStreamNamespace(stringval)
default:
setLabel(k, event, v)
}
default:
setLabel(k, event, v)
}
return true
})
if netPeerName == "" && (!strings.ContainsRune(peerAddress, ':') || net.ParseIP(peerAddress) != nil) {
// peer.address is not necessarily a hostname
// or IP address; it could be something like
// a JDBC connection string or ip:port. Ignore
// values containing colons, except for IPv6.
netPeerName = peerAddress
}
destPort := netPeerPort
destAddr := netPeerName
if destAddr == "" {
destAddr = netPeerIP
}
var fullURL *url.URL
if httpURL != "" {
fullURL, _ = url.Parse(httpURL)
} else if httpTarget != "" || httpURLPath != "" {
// Build http.url from http.scheme, http.target, etc.
// http.target was supported in older semconv versions, but was replaced by url.path and url.query
if httpTarget == "" {
httpTarget = httpURLPath
if httpURLQuery != "" {
httpTarget += "?" + httpURLQuery
}
}
if u, err := url.Parse(httpTarget); err == nil {
fullURL = u
fullURL.Scheme = httpScheme
if httpHost == "" {
// Set host from net.peer.*
httpHost = destAddr
if destPort > 0 {
httpHost = net.JoinHostPort(httpHost, strconv.Itoa(destPort))
}
}
fullURL.Host = httpHost
httpURL = fullURL.String()
}
}
if fullURL != nil {
var port int
portString := fullURL.Port()
if portString != "" {
port, _ = strconv.Atoi(portString)
} else {
port = schemeDefaultPort(fullURL.Scheme)
}
// Set destination.{address,port} from the HTTP URL,
// replacing peer.* based values to ensure consistency.
destAddr = truncate(fullURL.Hostname())
if port > 0 {
destPort = port
}
}
serviceTarget.Name = peerService
destinationService.Name = peerService
destinationService.Resource = peerService
if peerAddress != "" {
destinationService.Resource = peerAddress
}
if isHTTP {
if httpResponse.StatusCode > 0 && event.Event.Outcome == outcomeUnknown {
event.Event.Outcome = clientHTTPStatusCodeOutcome(int(httpResponse.StatusCode))
}
if http.SizeVT() != 0 {
event.Http = &http
}
if event.Url == nil {
event.Url = &modelpb.URL{}
}
event.Url.Original = httpURL
}
if isDatabase {
event.Span.Db = &db
}
if isMessaging {
event.Span.Message = &message
}
switch {
case isDatabase:
event.Span.Type = "db"
event.Span.Subtype = db.Type
serviceTarget.Type = event.Span.Type
if event.Span.Subtype != "" {
serviceTarget.Type = event.Span.Subtype
if destinationService.Name == "" {
// For database requests, we currently just identify
// the destination service by db.system.
destinationService.Name = event.Span.Subtype
destinationService.Resource = event.Span.Subtype
}
}
if db.Instance != "" {
serviceTarget.Name = db.Instance
}
case isMessaging:
event.Span.Type = "messaging"
event.Span.Subtype = messageSystem
if messageOperation == "" && spanKind == ptrace.SpanKindProducer {
messageOperation = "send"
}
event.Span.Action = messageOperation
serviceTarget.Type = event.Span.Type
if event.Span.Subtype != "" {
serviceTarget.Type = event.Span.Subtype
if destinationService.Name == "" {
destinationService.Name = event.Span.Subtype
destinationService.Resource = event.Span.Subtype
}
}
if destinationService.Resource != "" && message.QueueName != "" {
destinationService.Resource += "/" + message.QueueName
}
if message.QueueName != "" && !messageTempDestination {
serviceTarget.Name = message.QueueName
}
case isRPC:
event.Span.Type = "external"
event.Span.Subtype = rpcSystem
serviceTarget.Type = event.Span.Type
if event.Span.Subtype != "" {
serviceTarget.Type = event.Span.Subtype
}
// Set destination.service.* from the peer address, unless peer.service was specified.
if destinationService.Name == "" {
destHostPort := net.JoinHostPort(destAddr, strconv.Itoa(destPort))
destinationService.Name = destHostPort
destinationService.Resource = destHostPort
}
if rpcService != "" {
serviceTarget.Name = rpcService
}
case isHTTP:
event.Span.Type = "external"
subtype := "http"
event.Span.Subtype = subtype
serviceTarget.Type = event.Span.Subtype
if fullURL != nil {
url := url.URL{Scheme: fullURL.Scheme, Host: fullURL.Host}
resource := url.Host
if destPort == schemeDefaultPort(url.Scheme) {
if fullURL.Port() != "" {
// Remove the default port from destination.service.name
url.Host = destAddr
} else {
// Add the default port to destination.service.resource
resource = fmt.Sprintf("%s:%d", resource, destPort)
}
}
serviceTarget.Name = resource
if destinationService.Name == "" {
destinationService.Name = url.String()
destinationService.Resource = resource
}
}
case isGenAi:
event.Span.Type = "genai"
event.Span.Subtype = genAiSystem
serviceTarget.Type = event.Span.Type
default:
// Only set event.Span.Type if not already set
if event.Span.Type == "" {
switch spanKind {
case ptrace.SpanKindInternal:
event.Span.Type = "app"
event.Span.Subtype = "internal"
default:
event.Span.Type = "unknown"
}
}
}
if destAddr != "" {
event.Destination = &modelpb.Destination{
Address: destAddr,
Port: uint32(destPort),
}
}
if destinationService.SizeVT() != 0 {
if destinationService.Type == "" {
// Copy span type to destination.service.type.
destinationService.Type = event.Span.Type
}
event.Span.DestinationService = &destinationService
}
if serviceTarget.SizeVT() != 0 {
event.Service.Target = &serviceTarget
}
if samplerType != (pcommon.Value{}) {
// The client has reported its sampling rate, so we can use it to extrapolate transaction metrics.
parseSamplerAttributes(samplerType, samplerParam, event)
}
// if outcome is still not assigned, assign success
if event.Event.Outcome == outcomeUnknown {
event.Event.Outcome = outcomeSuccess
}
}
func parseSamplerAttributes(samplerType, samplerParam pcommon.Value, event *modelpb.APMEvent) {
switch samplerType := samplerType.Str(); samplerType {
case "probabilistic":
probability := samplerParam.Double()
if probability > 0 && probability <= 1 {
if event.Span != nil {
event.Span.RepresentativeCount = 1 / probability
}
if event.Transaction != nil {
event.Transaction.RepresentativeCount = 1 / probability
}
}
default:
if event.Span != nil {
event.Span.RepresentativeCount = 0
}
if event.Transaction != nil {
event.Transaction.RepresentativeCount = 0
}
modelpb.Labels(event.Labels).Set("sampler_type", samplerType)
switch samplerParam.Type() {
case pcommon.ValueTypeBool:
modelpb.Labels(event.Labels).Set("sampler_param", strconv.FormatBool(samplerParam.Bool()))
case pcommon.ValueTypeDouble:
modelpb.NumericLabels(event.NumericLabels).Set("sampler_param", samplerParam.Double())
}
}
}
func (c *Consumer) convertSpanEvent(
spanEvent ptrace.SpanEvent,
parent *modelpb.APMEvent, // either span or transaction
timeDelta time.Duration,
) *modelpb.APMEvent {
event := parent.CloneVT()
initEventLabels(event)
event.Transaction = nil // populate fields as required from parent
event.Span = nil // populate fields as required from parent
event.ParentId = "" // populate fields as required from parent
// Remove unnecessary fields from span event
if event.Service != nil {
event.Service.Target = nil
event.Service.Origin = nil
}
event.Timestamp = modelpb.FromTime(spanEvent.Timestamp().AsTime().Add(timeDelta))
isJaeger := strings.HasPrefix(parent.Agent.Name, "Jaeger")
if isJaeger {
event.Error = c.convertJaegerErrorSpanEvent(spanEvent, event)
} else if spanEvent.Name() == "exception" {
// Translate exception span events to errors.
//
// If it's not Jaeger, we assume OpenTelemetry semantic semconv.
// Per OpenTelemetry semantic conventions:
// `The name of the event MUST be "exception"`
var exceptionEscaped bool
var exceptionMessage, exceptionStacktrace, exceptionType string
spanEvent.Attributes().Range(func(k string, v pcommon.Value) bool {
switch k {
case semconv.AttributeExceptionMessage:
exceptionMessage = v.Str()
case semconv.AttributeExceptionStacktrace:
exceptionStacktrace = v.Str()
case semconv.AttributeExceptionType:
exceptionType = v.Str()
case "exception.escaped":
exceptionEscaped = v.Bool()
// data_stream.*
// Note: fields are parsed but dataset will be overridden by SetDataStream because it is an error
case attributeDataStreamDataset:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
setLabel(replaceDots(k), event, v)
}
return true
})
if exceptionMessage != "" || exceptionType != "" {
// Per OpenTelemetry semantic conventions:
// `At least one of the following sets of attributes is required:
// - exception.type
// - exception.message`
event.Error = convertOpenTelemetryExceptionSpanEvent(
exceptionType, exceptionMessage, exceptionStacktrace,
exceptionEscaped, parent.Service.Language.Name,
)
}
}
if event.Error != nil {
setErrorContext(event, parent)
} else {
// Set "event.kind" to indicate this is a log event.
if event.Event == nil {
event.Event = &modelpb.Event{}
}
event.Event.Kind = "event"
event.Message = spanEvent.Name()
setLogContext(event, parent)
spanEvent.Attributes().Range(func(k string, v pcommon.Value) bool {
switch k {
// data_stream.*
case attributeDataStreamDataset:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
k = replaceDots(k)
if isJaeger && k == "message" {
event.Message = truncate(v.Str())
return true
}
setLabel(k, event, v)
}
return true
})
}
return event
}
func (c *Consumer) convertJaegerErrorSpanEvent(event ptrace.SpanEvent, apmEvent *modelpb.APMEvent) *modelpb.Error {
var isError bool
var exMessage, exType string
var logMessage string
if name := truncate(event.Name()); name == "error" {
isError = true // according to opentracing spec
} else {
// Jaeger seems to send the message in the 'event' field.
//
// In case 'message' is sent we will use that, otherwise
// we will use 'event'.
logMessage = name
}
event.Attributes().Range(func(k string, v pcommon.Value) bool {
if v.Type() != pcommon.ValueTypeStr {
return true
}
stringval := truncate(v.Str())
switch k {
case "error", "error.object":
exMessage = stringval
isError = true
case "error.kind":
exType = stringval
isError = true
case "level":
isError = stringval == "error"
case "message":
logMessage = stringval
// data_stream.*
// Note: fields are parsed but dataset will be overridden by SetDataStream because it is an error
case attributeDataStreamDataset:
if apmEvent.DataStream == nil {
apmEvent.DataStream = &modelpb.DataStream{}
}
apmEvent.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if apmEvent.DataStream == nil {
apmEvent.DataStream = &modelpb.DataStream{}
}
apmEvent.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
setLabel(replaceDots(k), apmEvent, v)
}
return true
})
if !isError {
return nil
}
if logMessage == "" && exMessage == "" && exType == "" {
return nil
}
e := modelpb.Error{}
if logMessage != "" {
e.Log = &modelpb.ErrorLog{}
e.Log.Message = logMessage
}
if exMessage != "" || exType != "" {
e.Exception = &modelpb.Exception{}
e.Exception.Message = exMessage
e.Exception.Type = exType
if id, err := newUniqueID(); err == nil {
e.Id = id
}
}
return &e
}
func setErrorContext(out *modelpb.APMEvent, parent *modelpb.APMEvent) {
out.Trace.Id = parent.Trace.Id
out.Http = parent.Http
out.Url = parent.Url
if parent.Transaction != nil {
out.Transaction = &modelpb.Transaction{}
out.Transaction.Id = parent.Transaction.Id
out.Transaction.Sampled = parent.Transaction.Sampled
out.Transaction.Type = parent.Transaction.Type
out.Span = &modelpb.Span{}
out.Span.Id = parent.Transaction.Id
out.Error.Custom = parent.Transaction.Custom
out.ParentId = parent.Transaction.Id
}
if parent.Span != nil {
out.ParentId = parent.Span.Id
}
}
func setLogContext(out *modelpb.APMEvent, parent *modelpb.APMEvent) {
if parent.Transaction != nil {
out.Transaction = &modelpb.Transaction{}
out.Transaction.Id = parent.Transaction.Id
out.Span = &modelpb.Span{}
out.Span.Id = parent.Transaction.Id
}
if parent.Span != nil {
out.Span = &modelpb.Span{}
out.Span.Id = parent.Span.Id
}
}
func translateSpanLinks(out *modelpb.APMEvent, in ptrace.SpanLinkSlice) {
n := in.Len()
if n == 0 {
return
}
if out.Span == nil {
out.Span = &modelpb.Span{}
}
out.Span.Links = make([]*modelpb.SpanLink, 0, n)
for i := 0; i < n; i++ {
link := in.At(i)
// When a link has the elastic.is_child attribute set, it is stored in the child_ids instead
elChildAttribVal, elChildAttribPresent := link.Attributes().Get("elastic.is_child")
// alternatively, we also look for just "is_child" without the elastic. prefix
childAttribVal, childAttribPresent := link.Attributes().Get("is_child")
if (elChildAttribPresent && elChildAttribVal.Bool()) || (childAttribPresent && childAttribVal.Bool()) {
out.ChildIds = append(out.ChildIds, hexSpanID(link.SpanID()))
} else {
sl := modelpb.SpanLink{}
sl.SpanId = hexSpanID(link.SpanID())
sl.TraceId = hexTraceID(link.TraceID())
out.Span.Links = append(out.Span.Links, &sl)
}
}
}
func hexSpanID(id pcommon.SpanID) string {
if id.IsEmpty() {
return ""
}
return hex.EncodeToString(id[:])
}
func hexTraceID(id pcommon.TraceID) string {
if id.IsEmpty() {
return ""
}
return hex.EncodeToString(id[:])
}
func replaceDots(s string) string {
return strings.ReplaceAll(s, dot, underscore)
}
// spanStatusOutcome returns the outcome for transactions and spans based on
// the given OTLP span status.
func spanStatusOutcome(status ptrace.Status) string {
switch status.Code() {
case ptrace.StatusCodeOk:
return outcomeSuccess
case ptrace.StatusCodeError:
return outcomeFailure
}
return outcomeUnknown
}
// spanStatusResult returns the result for transactions based on the given
// OTLP span status. If the span status is unknown, an empty result string
// is returned.
func spanStatusResult(status ptrace.Status) string {
switch status.Code() {
case ptrace.StatusCodeOk:
return "Success"
case ptrace.StatusCodeError:
return "Error"
}
return ""
}
var standardStatusCodeResults = [...]string{
"HTTP 1xx",
"HTTP 2xx",
"HTTP 3xx",
"HTTP 4xx",
"HTTP 5xx",
}
// httpStatusCodeResult returns the transaction result value to use for the
// given HTTP status code.
func httpStatusCodeResult(statusCode int) string {
switch i := statusCode / 100; i {
case 1, 2, 3, 4, 5:
return standardStatusCodeResults[i-1]
}
return fmt.Sprintf("HTTP %d", statusCode)
}
// serverHTTPStatusCodeOutcome returns the transaction outcome value to use for
// the given HTTP status code.
func serverHTTPStatusCodeOutcome(statusCode int) string {
if statusCode >= 500 {
return outcomeFailure
}
return outcomeSuccess
}
// clientHTTPStatusCodeOutcome returns the span outcome value to use for the
// given HTTP status code.
func clientHTTPStatusCodeOutcome(statusCode int) string {
if statusCode >= 400 {
return outcomeFailure
}
return outcomeSuccess
}
// truncate returns s truncated at n runes, and the number of runes in the resulting string (<= n).
func truncate(s string) string {
var j int
for i := range s {
if j == keywordLength {
return s[:i]
}
j++
}
return s
}
func schemeDefaultPort(scheme string) int {
switch scheme {
case "http":
return 80
case "https":
return 443
}
return 0
}
// parses traceparent header, which is expected to be in the W3C Trace-Context
// and searches for the p-value as specified in
//
// https://opentelemetry.io/docs/reference/specification/trace/tracestate-probability-sampling/#p-value
//
// to calculate the adjusted count (i.e. representative count)
//
// If the p-value is missing or invalid in the tracestate we assume
// a sampling rate of 100% and a representative count of 1.
func getRepresentativeCountFromTracestateHeader(tracestace string) float64 {
// Default p-value is 0, leading to a default representative count of 1.
var p uint64 = 0
otValue := getValueForKeyInString(tracestace, "ot", ',', '=')
if otValue != "" {
pValue := getValueForKeyInString(otValue, "p", ';', ':')
if pValue != "" {
p, _ = strconv.ParseUint(pValue, 10, 6)
}
}
if p > 62 {
return 0.0
}
return math.Pow(2, float64(p))
}
func getValueForKeyInString(str string, key string, separator rune, assignChar rune) string {
for {
str = strings.TrimSpace(str)
if str == "" {
break
}
kv := str
if sepIdx := strings.IndexRune(str, separator); sepIdx != -1 {
kv = strings.TrimSpace(str[:sepIdx])
str = str[sepIdx+1:]
} else {
str = ""
}
equal := strings.IndexRune(kv, assignChar)
if equal != -1 && kv[:equal] == key {
return kv[equal+1:]
}
}
return ""
}