tracing/impl/stackdriver_tracer.go (301 lines of code) (raw):
//go:build tracer_static && tracer_static_stackdriver
package impl
import (
"context"
"encoding/base64"
"fmt"
"io"
"reflect"
"strconv"
"time"
"contrib.go.opencensus.io/exporter/stackdriver"
opentracing "github.com/opentracing/opentracing-go"
opentracinglog "github.com/opentracing/opentracing-go/log"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
)
// to play nice with grpc_opentracing tagsCarrier
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/a77ba4df9c270ec918ed6a6d506309078e3e4c4d/tracing/opentracing/id_extract.go#L30
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/a77ba4df9c270ec918ed6a6d506309078e3e4c4d/tracing/opentracing/options.go#L48
var traceHeader = "uber-trace-id"
// https://pkg.go.dev/github.com/opentracing/opentracing-go#Tracer
type adapterTracer struct {
exporter *stackdriver.Exporter
}
// opencensus does not support overwriting StartTime; so we only implement Tags
// https://pkg.go.dev/github.com/opentracing/opentracing-go#StartSpanOption
func (tracer *adapterTracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span {
sso := opentracing.StartSpanOptions{}
for _, o := range opts {
o.Apply(&sso)
}
var ctx context.Context
var adapterSpanCtx *adapterSpanContext
ctx = context.Background()
for _, ref := range sso.References {
if ref.Type == opentracing.ChildOfRef {
if v, ok := ref.ReferencedContext.(adapterSpanContext); ok {
ctx = v.ctx
adapterSpanCtx = &v
break
}
}
}
var span *trace.Span
if adapterSpanCtx != nil && adapterSpanCtx.remote {
ctx, span = trace.StartSpanWithRemoteParent(ctx, operationName, *adapterSpanCtx.ocSpanCtx)
} else {
ctx, span = trace.StartSpan(ctx, operationName)
}
spanContext := span.SpanContext()
adapterSpan := &adapterSpan{span, tracer, adapterSpanContext{ctx, false, &spanContext}}
for k, v := range sso.Tags {
adapterSpan.SetTag(k, v)
}
return adapterSpan
}
func (tracer *adapterTracer) Inject(sm opentracing.SpanContext, format interface{}, carrier interface{}) error {
c, ok := sm.(adapterSpanContext)
if !ok {
return opentracing.ErrInvalidSpanContext
}
if format != opentracing.TextMap && format != opentracing.HTTPHeaders {
return opentracing.ErrUnsupportedFormat
}
ocSpanCtx := trace.FromContext(c.ctx).SpanContext()
encoded := base64.StdEncoding.EncodeToString(propagation.Binary(ocSpanCtx))
carrier.(opentracing.TextMapWriter).Set(traceHeader, string(encoded))
return nil
}
func (tracer *adapterTracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
if format != opentracing.TextMap && format != opentracing.HTTPHeaders {
return nil, opentracing.ErrUnsupportedFormat
}
var ocSpanCtx trace.SpanContext
err := carrier.(opentracing.TextMapReader).ForeachKey(func(key, val string) error {
var ok bool
if key == traceHeader {
decoded, err := base64.StdEncoding.DecodeString(val)
if err != nil {
return err
}
ocSpanCtx, ok = propagation.FromBinary(decoded)
if !ok {
return opentracing.ErrInvalidCarrier
}
}
return nil
})
if err != nil {
return nil, err
}
return adapterSpanContext{
ctx: context.Background(),
remote: true,
ocSpanCtx: &ocSpanCtx,
}, nil
}
type adapterSpanContext struct {
ctx context.Context
remote bool
ocSpanCtx *trace.SpanContext
}
func (c adapterSpanContext) ForeachBaggageItem(handler func(k, v string) bool) {}
func (c adapterSpanContext) IsSampled() bool {
return c.ocSpanCtx.IsSampled()
}
// https://pkg.go.dev/go.opencensus.io/trace#Span
// https://pkg.go.dev/github.com/opentracing/opentracing-go#Span
type adapterSpan struct {
span trace.SpanInterface
tracer opentracing.Tracer
spanContext adapterSpanContext
}
func (span *adapterSpan) Finish() {
span.span.End()
}
func (span *adapterSpan) FinishWithOptions(opts opentracing.FinishOptions) {
span.span.End()
}
func (span *adapterSpan) Context() opentracing.SpanContext {
return span.spanContext
}
func (span *adapterSpan) SetOperationName(operationName string) opentracing.Span {
span.span.SetName(operationName)
return span
}
func castToAttribute(key string, value interface{}) []trace.Attribute {
switch v := reflect.ValueOf(value); v.Kind() {
case reflect.Bool:
return []trace.Attribute{trace.BoolAttribute(key, v.Bool())}
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return []trace.Attribute{trace.Int64Attribute(key, v.Int())}
case reflect.Float32, reflect.Float64:
return []trace.Attribute{trace.Float64Attribute(key, v.Float())}
case reflect.String:
chunks := chunkString(v.String(), 256)
attrs := []trace.Attribute{}
for i, chunk := range chunks {
var k string
if i == 0 {
k = key
} else {
k = key + "." + strconv.Itoa(i)
}
attrs = append(attrs, trace.StringAttribute(k, chunk))
}
return attrs
default:
return []trace.Attribute{trace.StringAttribute(key, fmt.Sprintf("castToAttribute not implemented for type %+v", v.Kind()))}
}
}
// stackdriver limits attribute values to 256 bytes
// we use runes here, hopefully that's close enough
// values get truncated silently, so we can set $key to the full value
// and then $key.1, $key.2, $key.3 are explicitly sliced.
func chunkString(s string, chunkSize int) []string {
if len(s) == 0 {
return []string{s}
}
var strs []string
for i := 0; i*chunkSize < len(s); i++ {
end := (i + 1) * chunkSize
if end > len(s) {
end = len(s)
}
strs = append(strs, s[i*chunkSize:end])
}
return strs
}
func (span *adapterSpan) SetTag(key string, value interface{}) opentracing.Span {
span.span.AddAttributes(castToAttribute(key, value)...)
return span
}
func (span *adapterSpan) LogFields(fields ...opentracinglog.Field) {
eventName := ""
attributes := []trace.Attribute{}
for _, field := range fields {
if field.Key() == "event" {
eventName = field.String()
continue
}
attributes = append(attributes, castToAttribute(field.Key(), field.Value())...)
}
span.span.Annotate(attributes, eventName)
}
func (span *adapterSpan) LogKV(alternatingKeyValues ...interface{}) {
if (len(alternatingKeyValues) % 2) != 0 {
log.Print("stackdriver tracer: warning: even number of arguments required to LogKV")
}
attributes := []trace.Attribute{}
eventName := ""
for i := 0; i < len(alternatingKeyValues); i += 2 {
key := alternatingKeyValues[i].(string)
value := alternatingKeyValues[i+1]
if key == "event" {
eventName = value.(string)
continue
}
attributes = append(attributes, castToAttribute(key, value)...)
}
span.span.Annotate(attributes, eventName)
}
func (span *adapterSpan) SetBaggageItem(restrictedKey, value string) opentracing.Span {
return span
}
func (span *adapterSpan) BaggageItem(restrictedKey string) string {
// not implemented
return ""
}
func (span *adapterSpan) Tracer() opentracing.Tracer {
return span.tracer
}
// Deprecated: use LogFields or LogKV.
func (span *adapterSpan) LogEvent(event string) {
// not implemented
}
// Deprecated: use LogFields or LogKV.
func (span *adapterSpan) LogEventWithPayload(event string, payload interface{}) {
// not implemented
}
// Deprecated: use LogFields or LogKV.
func (span *adapterSpan) Log(data opentracing.LogData) {
// not implemented
}
type stackdriverCloser struct {
exporter *stackdriver.Exporter
}
func (c stackdriverCloser) Close() error {
c.exporter.Flush()
return nil
}
type stackdriverConfig struct {
options *stackdriver.Options
sampler trace.Sampler
}
// https://pkg.go.dev/contrib.go.opencensus.io/exporter/stackdriver#Options
var stackdriverConfigMapper = map[string]func(traceCfg *stackdriverConfig, value string) error{
"project_id": func(stackdriverCfg *stackdriverConfig, value string) error {
stackdriverCfg.options.ProjectID = value
return nil
},
"location": func(stackdriverCfg *stackdriverConfig, value string) error {
stackdriverCfg.options.Location = value
return nil
},
"bundle_delay_threshold": func(stackdriverCfg *stackdriverConfig, value string) error {
d, err := time.ParseDuration(value)
if err != nil {
return err
}
stackdriverCfg.options.BundleDelayThreshold = d
return nil
},
"bundle_count_threshold": func(stackdriverCfg *stackdriverConfig, value string) error {
v, err := strconv.Atoi(value)
if err != nil {
return err
}
stackdriverCfg.options.BundleCountThreshold = v
return nil
},
"trace_spans_buffer_max_bytes": func(stackdriverCfg *stackdriverConfig, value string) error {
v, err := strconv.Atoi(value)
if err != nil {
return err
}
stackdriverCfg.options.TraceSpansBufferMaxBytes = v
return nil
},
"timeout": func(stackdriverCfg *stackdriverConfig, value string) error {
d, err := time.ParseDuration(value)
if err != nil {
return err
}
stackdriverCfg.options.Timeout = d
return nil
},
"number_of_workers": func(stackdriverCfg *stackdriverConfig, value string) error {
v, err := strconv.Atoi(value)
if err != nil {
return err
}
stackdriverCfg.options.NumberOfWorkers = v
return nil
},
"sampler_probability": func(stackdriverCfg *stackdriverConfig, value string) error {
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
stackdriverCfg.sampler = trace.ProbabilitySampler(v)
return nil
},
}
func stackdriverTracerFactory(config map[string]string) (opentracing.Tracer, io.Closer, error) {
stackdriverCfg := &stackdriverConfig{
options: &stackdriver.Options{},
sampler: trace.NeverSample(),
}
for k, v := range config {
mapper := stackdriverConfigMapper[k]
if mapper != nil {
err := mapper(stackdriverCfg, v)
if err != nil {
return nil, nil, err
}
} else {
log.Printf("stackdriver tracer: warning: ignoring unknown configuration option: %s", k)
}
}
exporter, err := stackdriver.NewExporter(*stackdriverCfg.options)
if err != nil {
return nil, nil, err
}
trace.RegisterExporter(exporter)
trace.ApplyConfig(trace.Config{DefaultSampler: stackdriverCfg.sampler})
return &adapterTracer{exporter}, &stackdriverCloser{exporter}, nil
}
func init() {
registerTracer("stackdriver", stackdriverTracerFactory)
}