module/apmotel/span.go (333 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmotel // import "go.elastic.co/apm/module/apmotel/v2"
import (
"fmt"
"net/http"
"net/url"
"reflect"
"runtime"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/embedded"
"go.elastic.co/apm/module/apmhttp/v2"
"go.elastic.co/apm/v2"
)
type event struct {
Name string
Attributes []attribute.KeyValue
Time time.Time
}
type status struct {
Code codes.Code
Description string
}
type span struct {
mu sync.Mutex
provider *tracerProvider
ended bool
startTime time.Time
attributes []attribute.KeyValue
events []event
spanContext trace.SpanContext
status status
spanKind trace.SpanKind
tx *apm.Transaction
span *apm.Span
embedded.Span
}
func (s *span) End(options ...trace.SpanEndOption) {
s.mu.Lock()
defer s.mu.Unlock()
if s.ended {
return
}
config := trace.NewSpanEndConfig(options...)
if !config.Timestamp().IsZero() {
duration := config.Timestamp().Sub(s.startTime)
if s.span != nil {
s.span.Duration = duration
} else {
s.tx.Duration = duration
}
}
var outcome string
switch s.status.Code {
case codes.Ok:
outcome = "success"
case codes.Error:
outcome = "failure"
case codes.Unset:
outcome = "unknown"
}
for iter := s.provider.resource.Iter(); iter.Next(); {
s.attributes = append(s.attributes, iter.Attribute())
}
s.ended = true
if s.span != nil {
s.setSpanAttributes()
s.span.Outcome = outcome
s.span.End()
return
}
s.setTransactionAttributes()
s.tx.Outcome = outcome
s.tx.End()
}
func (s *span) AddEvent(name string, opts ...trace.EventOption) {
c := trace.NewEventConfig(opts...)
e := event{Name: name, Attributes: c.Attributes(), Time: c.Timestamp()}
s.mu.Lock()
s.events = append(s.events, e)
s.mu.Unlock()
}
func (s *span) AddLink(tl trace.Link) {
s.mu.Lock()
defer s.mu.Unlock()
l := apm.SpanLink{
Trace: [16]byte(tl.SpanContext.TraceID()),
Span: [8]byte(tl.SpanContext.SpanID()),
}
if s.span != nil {
s.span.AddLink(l)
} else {
s.tx.AddLink(l)
}
}
func (s *span) IsRecording() bool {
if s.span != nil {
return !s.span.Dropped()
}
return s.tx.Sampled()
}
func (s *span) RecordError(err error, opts ...trace.EventOption) {
if s == nil || err == nil || !s.IsRecording() {
return
}
opts = append(opts, trace.WithAttributes(
semconv.ExceptionType(typeStr(err)),
semconv.ExceptionMessage(err.Error()),
))
c := trace.NewEventConfig(opts...)
if c.StackTrace() {
opts = append(opts, trace.WithAttributes(
semconv.ExceptionStacktrace(recordStackTrace()),
))
}
s.AddEvent(semconv.ExceptionEventName, opts...)
}
func (s *span) SpanContext() trace.SpanContext {
return s.spanContext
}
func (s *span) SetStatus(code codes.Code, description string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.status.Code > code {
return
}
status := status{Code: code}
if code == codes.Error {
status.Description = description
}
s.status = status
}
func (s *span) SetName(name string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.span != nil {
s.span.Name = name
} else {
s.tx.Name = name
}
}
func (s *span) SetAttributes(kv ...attribute.KeyValue) {
s.mu.Lock()
defer s.mu.Unlock()
for _, a := range kv {
if !a.Valid() {
// Drop all invalid attributes.
continue
}
s.attributes = append(s.attributes, a)
}
}
func (s *span) TracerProvider() trace.TracerProvider {
return s.provider
}
// setSpanAttributes matches some span attributes with our custom ones.
// See https://github.com/elastic/apm/blob/main/specs/agents/tracing-api-otel.md#span-type-sub-type-and-service-target
func (s *span) setSpanAttributes() {
var (
dbContext apm.DatabaseSpanContext
httpURL string
httpMethod string
httpHost string
netPeerPort string
netPeerName string
haveDBContext bool
haveHTTPContext bool
haveHTTPHostTag bool
serviceTargetName string
serviceTargetType string
)
agentAttrs := make(map[string]interface{})
for _, v := range s.attributes {
agentAttrs[string(v.Key)] = v.Value.AsInterface()
switch v.Key {
case "component":
s.span.Subtype = v.Value.Emit()
case "db.system":
s.span.Type = "db"
s.span.Subtype = v.Value.Emit()
dbContext.Type = v.Value.Emit()
serviceTargetType = v.Value.Emit()
haveDBContext = true
case "db.instance":
dbContext.Instance = v.Value.Emit()
haveDBContext = true
case "db.statement":
dbContext.Statement = v.Value.Emit()
haveDBContext = true
case "db.user":
dbContext.User = v.Value.Emit()
haveDBContext = true
case "db.name":
serviceTargetName = v.Value.Emit()
case "messaging.system":
s.span.Type = "messaging"
s.span.Subtype = v.Value.Emit()
serviceTargetType = v.Value.Emit()
case "messaging.destination":
serviceTargetName = v.Value.Emit()
case "rpc.system":
s.span.Type = "external"
s.span.Subtype = v.Value.Emit()
serviceTargetType = v.Value.Emit()
case "rpc.service":
serviceTargetName = v.Value.Emit()
case "net.peer.port":
netPeerPort = v.Value.Emit()
case "net.peer.name":
netPeerName = v.Value.Emit()
case "http.url":
s.span.Type = "external"
s.span.Subtype = "http"
serviceTargetName = v.Value.Emit()
haveHTTPContext = true
httpURL = v.Value.Emit()
case "http.scheme":
s.span.Type = "external"
s.span.Subtype = "http"
case "http.method":
haveHTTPContext = true
httpMethod = v.Value.Emit()
case "http.host":
haveHTTPContext = true
haveHTTPHostTag = true
httpHost = v.Value.Emit()
}
}
if netPeerPort != "" && netPeerName != "" {
serviceTargetName = fmt.Sprintf("%s:%s", netPeerName, netPeerPort)
}
switch {
case haveHTTPContext:
url, err := url.Parse(httpURL)
if err == nil {
// handles the case where the url.Host hasn't been set.
// Tries obtaining the host value from the "http.host" tag.
// If not found, or if the url.Host has a value, it won't
// mutate the existing host.
if url.Host == "" && haveHTTPHostTag {
url.Host = httpHost
}
s.span.Context.SetHTTPRequest(&http.Request{
Method: httpMethod,
URL: url,
})
}
case haveDBContext:
s.span.Context.SetDatabase(dbContext)
}
if serviceTargetType != "" || serviceTargetName != "" {
s.span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{
Type: serviceTargetType,
Name: serviceTargetName,
})
}
s.span.Context.SetOTelAttributes(agentAttrs)
s.span.Context.SetOTelSpanKind(s.spanKind.String())
}
// setTransactionAttributes matches some of the transaction attributes with our custom ones
// See https://github.com/elastic/apm/blob/main/specs/agents/tracing-api-otel.md#transaction-type
func (s *span) setTransactionAttributes() {
var (
isHTTP bool
isRPC bool
isMessaging bool
httpMethod string
httpStatusCode = -1
httpURL string
)
agentAttrs := make(map[string]interface{})
for _, v := range s.attributes {
agentAttrs[string(v.Key)] = v.Value.AsInterface()
switch v.Key {
case "component":
s.tx.Type = v.Value.Emit()
case "http.method":
httpMethod = v.Value.Emit()
case "http.status_code":
if code := v.Value.AsInt64(); code > 0 {
httpStatusCode = int(code)
}
case "http.url":
httpURL = v.Value.Emit()
isHTTP = true
case "http.scheme":
isHTTP = true
case "rpc.system":
isRPC = true
case "messaging.system":
isMessaging = true
case "result":
s.tx.Result = v.Value.Emit()
case "user.id":
s.tx.Context.SetUserID(v.Value.Emit())
case "user.email":
s.tx.Context.SetUserEmail(v.Value.Emit())
case "user.username":
s.tx.Context.SetUsername(v.Value.Emit())
}
}
if s.tx.Type == "" {
if s.spanKind == trace.SpanKindServer && (isHTTP || isRPC) {
s.tx.Type = "request"
} else if s.spanKind == trace.SpanKindConsumer && isMessaging {
s.tx.Type = "messaging"
} else {
s.tx.Type = "unknown"
}
}
if s.tx.Result == "" && httpStatusCode != -1 {
s.tx.Result = apmhttp.StatusCodeResult(httpStatusCode)
s.tx.Context.SetHTTPStatusCode(httpStatusCode)
}
if isHTTP {
if uri, err := url.ParseRequestURI(httpURL); err == nil {
var req http.Request
req.ProtoMajor = 1 // Assume HTTP/1.1
req.ProtoMinor = 1
req.Method = httpMethod
req.URL = uri
s.tx.Context.SetHTTPRequest(&req)
}
}
s.tx.Context.SetOTelAttributes(agentAttrs)
s.tx.Context.SetOTelSpanKind(s.spanKind.String())
}
func typeStr(i interface{}) string {
t := reflect.TypeOf(i)
if t.PkgPath() == "" && t.Name() == "" {
// Likely a builtin type.
return t.String()
}
return fmt.Sprintf("%s.%s", t.PkgPath(), t.Name())
}
func recordStackTrace() string {
stackTrace := make([]byte, 2048)
n := runtime.Stack(stackTrace, false)
return string(stackTrace[0:n])
}