exporter/trace/trace_proto.go (301 lines of code) (raw):
// Copyright 2019 OpenTelemetry Authors
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
import (
"fmt"
"math"
"strconv"
"strings"
"time"
"unicode/utf8"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"cloud.google.com/go/trace/apiv2/tracepb"
codepb "google.golang.org/genproto/googleapis/rpc/code"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping"
)
const (
maxAnnotationEventsPerSpan = 32
// TODO(ymotongpoo): uncomment this after gRPC trace get supported.
// maxMessageEventsPerSpan = 128.
maxAttributeStringValue = 256
maxNumLinks = 128
agentLabel = "g.co/agent"
// Attributes recorded on the span for the requests.
// Only trace exporters will need them.
hostAttribute = "http.host"
methodAttribute = "http.method"
pathAttribute = "http.path"
urlAttribute = "http.url"
userAgentAttribute = "http.user_agent"
statusCodeAttribute = "http.status_code"
serviceAttribute = "service.name"
labelHTTPHost = `/http/host`
labelHTTPMethod = `/http/method`
labelHTTPStatusCode = `/http/status_code`
labelHTTPPath = `/http/path`
labelHTTPUserAgent = `/http/user_agent`
instrumentationScopeNameAttribute = "otel.scope.name"
instrumentationScopeVersionAttribute = "otel.scope.version"
)
var userAgent = fmt.Sprintf("opentelemetry-go %s; google-cloud-trace-exporter %s", otel.Version(), Version())
// Adapters for using resourcemapping library.
type attrs struct {
Attrs []attribute.KeyValue
}
func (a *attrs) GetString(key string) (string, bool) {
for _, kv := range a.Attrs {
if kv.Key == attribute.Key(key) {
return kv.Value.AsString(), true
}
}
return "", false
}
// If there are duplicate keys present in the list of attributes,
// then the first value found for the key is preserved.
func attributeWithLabelsFromResources(sd sdktrace.ReadOnlySpan) []attribute.KeyValue {
attributes := sd.Attributes()
if sd.Resource().Len() == 0 {
return attributes
}
uniqueAttrs := make(map[attribute.Key]bool, len(sd.Attributes()))
// Span Attributes take precedence
for _, attr := range sd.Attributes() {
uniqueAttrs[attr.Key] = true
}
// Raw resource attributes are next.
for _, attr := range sd.Resource().Attributes() {
if uniqueAttrs[attr.Key] {
continue // skip resource attributes which conflict with span attributes
}
uniqueAttrs[attr.Key] = true
attributes = append(attributes, attr)
}
// Instrumentation Scope attributes come next.
if !uniqueAttrs[instrumentationScopeNameAttribute] {
uniqueAttrs[instrumentationScopeNameAttribute] = true
scopeNameAttrs := attribute.String(instrumentationScopeNameAttribute, sd.InstrumentationScope().Name)
attributes = append(attributes, scopeNameAttrs)
}
if !uniqueAttrs[instrumentationScopeVersionAttribute] && strings.Compare("", sd.InstrumentationScope().Version) != 0 {
uniqueAttrs[instrumentationScopeVersionAttribute] = true
scopeVersionAttrs := attribute.String(instrumentationScopeVersionAttribute, sd.InstrumentationScope().Version)
attributes = append(attributes, scopeVersionAttrs)
}
// Monitored resource attributes (`g.co/r/{resource_type}/{resource_label}`) come next.
gceResource := resourcemapping.ResourceAttributesToMonitoringMonitoredResource(&attrs{
Attrs: sd.Resource().Attributes(),
})
for key, value := range gceResource.Labels {
name := fmt.Sprintf("g.co/r/%v/%v", gceResource.Type, key)
attributes = append(attributes, attribute.String(name, value))
}
return attributes
}
func (e *traceExporter) protoFromReadOnlySpan(s sdktrace.ReadOnlySpan) (*tracepb.Span, string) {
if s == nil {
return nil, ""
}
traceIDString := s.SpanContext().TraceID().String()
spanIDString := s.SpanContext().SpanID().String()
projectID := e.projectID
// override project ID with gcp.project.id, if present
attrs := s.Resource().Attributes()
for _, attr := range attrs {
if attr.Key == resourcemapping.ProjectIDAttributeKey {
projectID = attr.Value.AsString()
break
}
}
sp := &tracepb.Span{
Name: "projects/" + projectID + "/traces/" + traceIDString + "/spans/" + spanIDString,
SpanId: spanIDString,
DisplayName: trunc(s.Name(), 128),
StartTime: timestampProto(s.StartTime()),
EndTime: timestampProto(s.EndTime()),
SameProcessAsParentSpan: &wrapperspb.BoolValue{Value: !s.Parent().IsRemote()},
SpanKind: convertSpanKind(s.SpanKind()),
}
if s.Parent().SpanID() != s.SpanContext().SpanID() && s.Parent().SpanID().IsValid() {
sp.ParentSpanId = s.Parent().SpanID().String()
}
switch s.Status().Code {
case codes.Ok:
sp.Status = &statuspb.Status{Code: int32(codepb.Code_OK)}
case codes.Unset:
// Don't set status code.
case codes.Error:
sp.Status = &statuspb.Status{Code: int32(codepb.Code_UNKNOWN), Message: s.Status().Description}
default:
sp.Status = &statuspb.Status{Code: int32(codepb.Code_UNKNOWN)}
}
attributes := attributeWithLabelsFromResources(s)
e.copyAttributes(&sp.Attributes, attributes)
// NOTE(ymotongpoo): omitting copyMonitoringReesourceAttributes()
var annotations, droppedAnnotationsCount int
es := s.Events()
for i, ev := range es {
if annotations >= maxAnnotationEventsPerSpan {
droppedAnnotationsCount = len(es) - i
break
}
annotation := &tracepb.Span_TimeEvent_Annotation{Description: trunc(ev.Name, maxAttributeStringValue)}
e.copyAttributes(&annotation.Attributes, ev.Attributes)
event := &tracepb.Span_TimeEvent{
Time: timestampProto(ev.Time),
Value: &tracepb.Span_TimeEvent_Annotation_{Annotation: annotation},
}
annotations++
if sp.TimeEvents == nil {
sp.TimeEvents = &tracepb.Span_TimeEvents{}
}
sp.TimeEvents.TimeEvent = append(sp.TimeEvents.TimeEvent, event)
}
if sp.Attributes == nil {
sp.Attributes = &tracepb.Span_Attributes{
AttributeMap: make(map[string]*tracepb.AttributeValue),
}
}
// Only set the agent label if it is not already set. That enables the
// OpenTelemery service/collector to set the agent label based on the library that
// sent the span to the service.
// TODO(jsuereth): This scenario is highly unlikely. This would require vanilla OTLP
// sources of tracess to be setting "g.co/agent" labels on spans. We should confirm
// and remove/update this code.
if _, hasAgent := sp.Attributes.AttributeMap[agentLabel]; !hasAgent {
sp.Attributes.AttributeMap[agentLabel] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: trunc(userAgent, maxAttributeStringValue),
},
}
}
// TODO(ymotongpoo): add implementations for Span_TimeEvent_MessageEvent_
// once OTel finish implementations for gRPC.
if droppedAnnotationsCount != 0 {
if sp.TimeEvents == nil {
sp.TimeEvents = &tracepb.Span_TimeEvents{}
}
sp.TimeEvents.DroppedAnnotationsCount = clip32(droppedAnnotationsCount)
}
sp.Links = e.linksProtoFromLinks(s.Links())
return sp, projectID
}
// Converts OTel span links to Cloud Trace links proto in order. If there are
// more than maxNumLinks links, the first maxNumLinks will be taken and the rest
// dropped.
func (e *traceExporter) linksProtoFromLinks(links []sdktrace.Link) *tracepb.Span_Links {
numLinks := len(links)
if numLinks == 0 {
return nil
}
linksPb := &tracepb.Span_Links{}
numLinksToKeep := numLinks
if numLinksToKeep > maxNumLinks {
numLinksToKeep = maxNumLinks
}
for _, link := range links[:numLinksToKeep] {
linkPb := &tracepb.Span_Link{
TraceId: link.SpanContext.TraceID().String(),
SpanId: link.SpanContext.SpanID().String(),
Type: tracepb.Span_Link_TYPE_UNSPECIFIED,
}
e.copyAttributes(&linkPb.Attributes, link.Attributes)
linksPb.Link = append(linksPb.Link, linkPb)
}
linksPb.DroppedLinksCount = clip32(numLinks - numLinksToKeep)
return linksPb
}
// timestampProto creates a timestamp proto for a time.Time.
func timestampProto(t time.Time) *timestamppb.Timestamp {
return ×tamppb.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
}
// copyAttributes copies a map of attributes to a proto map field.
// It creates the map if it is nil.
func (e *traceExporter) copyAttributes(out **tracepb.Span_Attributes, in []attribute.KeyValue) {
if len(in) == 0 {
return
}
if *out == nil {
*out = &tracepb.Span_Attributes{}
}
if (*out).AttributeMap == nil {
(*out).AttributeMap = make(map[string]*tracepb.AttributeValue)
}
var dropped int32
for _, kv := range in {
av := attributeValue(kv)
if av == nil {
continue
}
key := e.o.mapAttribute(kv.Key)
if len(key) > 128 {
dropped++
continue
}
(*out).AttributeMap[string(key)] = av
}
(*out).DroppedAttributesCount = dropped
}
// defaultAttributeMapping maps attributes to trace attributes which are
// used by cloud trace for prominent UI functions, and keeps all others.
func defaultAttributeMapping(k attribute.Key) attribute.Key {
switch k {
case pathAttribute:
return labelHTTPPath
case hostAttribute:
return labelHTTPHost
case methodAttribute:
return labelHTTPMethod
case userAgentAttribute:
return labelHTTPUserAgent
case statusCodeAttribute:
return labelHTTPStatusCode
}
return k
}
func attributeValue(keyValue attribute.KeyValue) *tracepb.AttributeValue {
v := keyValue.Value
switch v.Type() {
case attribute.BOOL:
return &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_BoolValue{BoolValue: v.AsBool()},
}
case attribute.INT64:
return &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_IntValue{IntValue: v.AsInt64()},
}
case attribute.FLOAT64:
// TODO: set double value if Google Cloud Trace support it in the future.
return &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: trunc(strconv.FormatFloat(v.AsFloat64(), 'f', -1, 64),
maxAttributeStringValue)},
}
case attribute.STRING:
return &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{StringValue: trunc(v.AsString(), maxAttributeStringValue)},
}
}
return nil
}
// trunc returns a TruncatableString truncated to the given limit.
func trunc(s string, limit int) *tracepb.TruncatableString {
if len(s) > limit {
b := []byte(s[:limit])
for {
r, size := utf8.DecodeLastRune(b)
if r != utf8.RuneError || size != 1 {
break
}
b = b[:len(b)-1]
}
return &tracepb.TruncatableString{
Value: string(b),
TruncatedByteCount: clip32(len(s) - len(b)),
}
}
return &tracepb.TruncatableString{
Value: s,
TruncatedByteCount: 0,
}
}
// clip32 clips an int to the range of an int32.
func clip32(x int) int32 {
if x < math.MinInt32 {
return math.MinInt32
}
if x > math.MaxInt32 {
return math.MaxInt32
}
return int32(x)
}
func convertSpanKind(kind trace.SpanKind) tracepb.Span_SpanKind {
switch kind {
case trace.SpanKindUnspecified, trace.SpanKindInternal:
// SpanKindUnspecified is an unspecified SpanKind and is not a
// valid SpanKind. SpanKindUnspecified should be replaced with
// SpanKindInternal if it is received.
return tracepb.Span_INTERNAL
case trace.SpanKindServer:
return tracepb.Span_SERVER
case trace.SpanKindClient:
return tracepb.Span_CLIENT
case trace.SpanKindProducer:
return tracepb.Span_PRODUCER
case trace.SpanKindConsumer:
return tracepb.Span_CONSUMER
default:
return tracepb.Span_INTERNAL
}
}