tracing/impl/stackdriver_tracer.go (301 lines of code) (raw):

//go:build tracer_static && tracer_static_stackdriver package impl import ( "context" "encoding/base64" "fmt" "io" "reflect" "strconv" "time" "contrib.go.opencensus.io/exporter/stackdriver" opentracing "github.com/opentracing/opentracing-go" opentracinglog "github.com/opentracing/opentracing-go/log" log "github.com/sirupsen/logrus" "go.opencensus.io/trace" "go.opencensus.io/trace/propagation" ) // to play nice with grpc_opentracing tagsCarrier // https://github.com/grpc-ecosystem/go-grpc-middleware/blob/a77ba4df9c270ec918ed6a6d506309078e3e4c4d/tracing/opentracing/id_extract.go#L30 // https://github.com/grpc-ecosystem/go-grpc-middleware/blob/a77ba4df9c270ec918ed6a6d506309078e3e4c4d/tracing/opentracing/options.go#L48 var traceHeader = "uber-trace-id" // https://pkg.go.dev/github.com/opentracing/opentracing-go#Tracer type adapterTracer struct { exporter *stackdriver.Exporter } // opencensus does not support overwriting StartTime; so we only implement Tags // https://pkg.go.dev/github.com/opentracing/opentracing-go#StartSpanOption func (tracer *adapterTracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span { sso := opentracing.StartSpanOptions{} for _, o := range opts { o.Apply(&sso) } var ctx context.Context var adapterSpanCtx *adapterSpanContext ctx = context.Background() for _, ref := range sso.References { if ref.Type == opentracing.ChildOfRef { if v, ok := ref.ReferencedContext.(adapterSpanContext); ok { ctx = v.ctx adapterSpanCtx = &v break } } } var span *trace.Span if adapterSpanCtx != nil && adapterSpanCtx.remote { ctx, span = trace.StartSpanWithRemoteParent(ctx, operationName, *adapterSpanCtx.ocSpanCtx) } else { ctx, span = trace.StartSpan(ctx, operationName) } spanContext := span.SpanContext() adapterSpan := &adapterSpan{span, tracer, adapterSpanContext{ctx, false, &spanContext}} for k, v := range sso.Tags { adapterSpan.SetTag(k, v) } return adapterSpan } func (tracer *adapterTracer) Inject(sm opentracing.SpanContext, format interface{}, carrier interface{}) error { c, ok := sm.(adapterSpanContext) if !ok { return opentracing.ErrInvalidSpanContext } if format != opentracing.TextMap && format != opentracing.HTTPHeaders { return opentracing.ErrUnsupportedFormat } ocSpanCtx := trace.FromContext(c.ctx).SpanContext() encoded := base64.StdEncoding.EncodeToString(propagation.Binary(ocSpanCtx)) carrier.(opentracing.TextMapWriter).Set(traceHeader, string(encoded)) return nil } func (tracer *adapterTracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) { if format != opentracing.TextMap && format != opentracing.HTTPHeaders { return nil, opentracing.ErrUnsupportedFormat } var ocSpanCtx trace.SpanContext err := carrier.(opentracing.TextMapReader).ForeachKey(func(key, val string) error { var ok bool if key == traceHeader { decoded, err := base64.StdEncoding.DecodeString(val) if err != nil { return err } ocSpanCtx, ok = propagation.FromBinary(decoded) if !ok { return opentracing.ErrInvalidCarrier } } return nil }) if err != nil { return nil, err } return adapterSpanContext{ ctx: context.Background(), remote: true, ocSpanCtx: &ocSpanCtx, }, nil } type adapterSpanContext struct { ctx context.Context remote bool ocSpanCtx *trace.SpanContext } func (c adapterSpanContext) ForeachBaggageItem(handler func(k, v string) bool) {} func (c adapterSpanContext) IsSampled() bool { return c.ocSpanCtx.IsSampled() } // https://pkg.go.dev/go.opencensus.io/trace#Span // https://pkg.go.dev/github.com/opentracing/opentracing-go#Span type adapterSpan struct { span trace.SpanInterface tracer opentracing.Tracer spanContext adapterSpanContext } func (span *adapterSpan) Finish() { span.span.End() } func (span *adapterSpan) FinishWithOptions(opts opentracing.FinishOptions) { span.span.End() } func (span *adapterSpan) Context() opentracing.SpanContext { return span.spanContext } func (span *adapterSpan) SetOperationName(operationName string) opentracing.Span { span.span.SetName(operationName) return span } func castToAttribute(key string, value interface{}) []trace.Attribute { switch v := reflect.ValueOf(value); v.Kind() { case reflect.Bool: return []trace.Attribute{trace.BoolAttribute(key, v.Bool())} case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: return []trace.Attribute{trace.Int64Attribute(key, v.Int())} case reflect.Float32, reflect.Float64: return []trace.Attribute{trace.Float64Attribute(key, v.Float())} case reflect.String: chunks := chunkString(v.String(), 256) attrs := []trace.Attribute{} for i, chunk := range chunks { var k string if i == 0 { k = key } else { k = key + "." + strconv.Itoa(i) } attrs = append(attrs, trace.StringAttribute(k, chunk)) } return attrs default: return []trace.Attribute{trace.StringAttribute(key, fmt.Sprintf("castToAttribute not implemented for type %+v", v.Kind()))} } } // stackdriver limits attribute values to 256 bytes // we use runes here, hopefully that's close enough // values get truncated silently, so we can set $key to the full value // and then $key.1, $key.2, $key.3 are explicitly sliced. func chunkString(s string, chunkSize int) []string { if len(s) == 0 { return []string{s} } var strs []string for i := 0; i*chunkSize < len(s); i++ { end := (i + 1) * chunkSize if end > len(s) { end = len(s) } strs = append(strs, s[i*chunkSize:end]) } return strs } func (span *adapterSpan) SetTag(key string, value interface{}) opentracing.Span { span.span.AddAttributes(castToAttribute(key, value)...) return span } func (span *adapterSpan) LogFields(fields ...opentracinglog.Field) { eventName := "" attributes := []trace.Attribute{} for _, field := range fields { if field.Key() == "event" { eventName = field.String() continue } attributes = append(attributes, castToAttribute(field.Key(), field.Value())...) } span.span.Annotate(attributes, eventName) } func (span *adapterSpan) LogKV(alternatingKeyValues ...interface{}) { if (len(alternatingKeyValues) % 2) != 0 { log.Print("stackdriver tracer: warning: even number of arguments required to LogKV") } attributes := []trace.Attribute{} eventName := "" for i := 0; i < len(alternatingKeyValues); i += 2 { key := alternatingKeyValues[i].(string) value := alternatingKeyValues[i+1] if key == "event" { eventName = value.(string) continue } attributes = append(attributes, castToAttribute(key, value)...) } span.span.Annotate(attributes, eventName) } func (span *adapterSpan) SetBaggageItem(restrictedKey, value string) opentracing.Span { return span } func (span *adapterSpan) BaggageItem(restrictedKey string) string { // not implemented return "" } func (span *adapterSpan) Tracer() opentracing.Tracer { return span.tracer } // Deprecated: use LogFields or LogKV. func (span *adapterSpan) LogEvent(event string) { // not implemented } // Deprecated: use LogFields or LogKV. func (span *adapterSpan) LogEventWithPayload(event string, payload interface{}) { // not implemented } // Deprecated: use LogFields or LogKV. func (span *adapterSpan) Log(data opentracing.LogData) { // not implemented } type stackdriverCloser struct { exporter *stackdriver.Exporter } func (c stackdriverCloser) Close() error { c.exporter.Flush() return nil } type stackdriverConfig struct { options *stackdriver.Options sampler trace.Sampler } // https://pkg.go.dev/contrib.go.opencensus.io/exporter/stackdriver#Options var stackdriverConfigMapper = map[string]func(traceCfg *stackdriverConfig, value string) error{ "project_id": func(stackdriverCfg *stackdriverConfig, value string) error { stackdriverCfg.options.ProjectID = value return nil }, "location": func(stackdriverCfg *stackdriverConfig, value string) error { stackdriverCfg.options.Location = value return nil }, "bundle_delay_threshold": func(stackdriverCfg *stackdriverConfig, value string) error { d, err := time.ParseDuration(value) if err != nil { return err } stackdriverCfg.options.BundleDelayThreshold = d return nil }, "bundle_count_threshold": func(stackdriverCfg *stackdriverConfig, value string) error { v, err := strconv.Atoi(value) if err != nil { return err } stackdriverCfg.options.BundleCountThreshold = v return nil }, "trace_spans_buffer_max_bytes": func(stackdriverCfg *stackdriverConfig, value string) error { v, err := strconv.Atoi(value) if err != nil { return err } stackdriverCfg.options.TraceSpansBufferMaxBytes = v return nil }, "timeout": func(stackdriverCfg *stackdriverConfig, value string) error { d, err := time.ParseDuration(value) if err != nil { return err } stackdriverCfg.options.Timeout = d return nil }, "number_of_workers": func(stackdriverCfg *stackdriverConfig, value string) error { v, err := strconv.Atoi(value) if err != nil { return err } stackdriverCfg.options.NumberOfWorkers = v return nil }, "sampler_probability": func(stackdriverCfg *stackdriverConfig, value string) error { v, err := strconv.ParseFloat(value, 64) if err != nil { return err } stackdriverCfg.sampler = trace.ProbabilitySampler(v) return nil }, } func stackdriverTracerFactory(config map[string]string) (opentracing.Tracer, io.Closer, error) { stackdriverCfg := &stackdriverConfig{ options: &stackdriver.Options{}, sampler: trace.NeverSample(), } for k, v := range config { mapper := stackdriverConfigMapper[k] if mapper != nil { err := mapper(stackdriverCfg, v) if err != nil { return nil, nil, err } } else { log.Printf("stackdriver tracer: warning: ignoring unknown configuration option: %s", k) } } exporter, err := stackdriver.NewExporter(*stackdriverCfg.options) if err != nil { return nil, nil, err } trace.RegisterExporter(exporter) trace.ApplyConfig(trace.Config{DefaultSampler: stackdriverCfg.sampler}) return &adapterTracer{exporter}, &stackdriverCloser{exporter}, nil } func init() { registerTracer("stackdriver", stackdriverTracerFactory) }