crossdock/behavior/trace/behavior.go (198 lines of code) (raw):

// Copyright (c) 2016 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package trace import ( "fmt" "strconv" "time" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/crossdock/log" "github.com/crossdock/crossdock-go" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/utils" "golang.org/x/net/context" ) // Different parameter keys and values used by the system const ( BehaviorName = "trace" ) // Behavior is an implementation of "trace behavior", that verifies // that a tracing context and baggage are properly propagated through // two level of servers. type Behavior struct { ServerPort string Tracer opentracing.Tracer ServiceToHost func(string) string ch *tchannel.Channel thriftCall DownstreamCall jsonCall DownstreamCall } // DownstreamCall is an encoding-agnostic abstraction of calling a downstream service. type DownstreamCall func(ctx context.Context, target *Downstream) (*Response, error) // Register function adds JSON and Thrift handlers to the server channel ch func (b *Behavior) Register(ch *tchannel.Channel) { b.registerThrift(ch) b.registerJSON(ch) } // Run executes the trace behavior func (b *Behavior) Run(t crossdock.T) { logParams(t) sampled, err := strconv.ParseBool(t.Param(sampledParam)) if err != nil { t.Fatalf("Malformed param %s: %s", sampledParam, err) } baggage := randomBaggage() level1 := &Request{ ServerRole: RoleS1, } server1 := t.Param(server1NameParam) level2 := &Downstream{ ServiceName: t.Param(server2NameParam), ServerRole: RoleS2, HostPort: fmt.Sprintf("%s:%s", b.serviceToHost(t.Param(server2NameParam)), b.ServerPort, ), Encoding: t.Param(server2EncodingParam), } level1.Downstream = level2 level3 := &Downstream{ ServiceName: t.Param(server3NameParam), ServerRole: RoleS3, HostPort: fmt.Sprintf("%s:%s", b.serviceToHost(t.Param(server3NameParam)), b.ServerPort, ), Encoding: t.Param(server3EncodingParam), } level2.Downstream = level3 resp, err := b.startTrace(t, level1, sampled, baggage) if err != nil { t.Errorf("Failed to startTrace in S1(%s): %s", server1, err.Error()) return } log.Printf("Response: span=%+v, downstream=%+v", resp.Span, resp.Downstream) traceID := resp.Span.TraceID require := crossdock.Require(t) require.NotEmpty(traceID, "Trace ID should not be empty in S1(%s)", server1) if validateTrace(t, level1.Downstream, resp, server1, 1, traceID, sampled, baggage) { t.Successf("trace checks out") log.Println("PASS") } else { log.Println("FAIL") } } func logParams(t crossdock.T) { keys := []string{ sampledParam, server1NameParam, server2NameParam, server2EncodingParam, server3NameParam, server3EncodingParam, } out := "Execute" for _, key := range keys { out = fmt.Sprintf("%s %s=%s", out, key, t.Param(key)) } log.Println(out) } func (b *Behavior) serviceToHost(service string) string { if b.ServiceToHost != nil { return b.ServiceToHost(service) } return service } func (b *Behavior) startTrace(t crossdock.T, req *Request, sampled bool, baggage string) (*Response, error) { span := b.Tracer.StartSpan(req.ServerRole) if sampled { ext.SamplingPriority.Set(span, 1) } span.SetBaggageItem(BaggageKey, baggage) defer span.Finish() ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() ctx = opentracing.ContextWithSpan(ctx, span) return b.prepareResponse(ctx, t, req.Downstream) } func validateTrace( t crossdock.T, target *Downstream, resp *Response, service string, level int, traceID string, sampled bool, baggage string, ) bool { service = fmt.Sprintf("S%d(%s)", level, service) checks := crossdock.Checks(t) s := true s = checks.Equal(traceID, resp.Span.TraceID, "Trace ID must match in %s", service) && s s = checks.Equal(baggage, resp.Span.Baggage, "Baggage must match in %s", service) && s s = checks.Equal(sampled, resp.Span.Sampled, "Sampled must match in %s", service) && s if target != nil { if resp.Downstream == nil { t.Errorf("Should have downstream in S%d(%s)", level, service) s = false } else { s = validateTrace(t, target.Downstream, resp.Downstream, target.HostPort, level+1, traceID, sampled, baggage) && s } } else if resp.Downstream != nil { s = checks.Nil(resp.Downstream, "Should not have downstream in %s", service) && s } return s } func randomBaggage() string { r := utils.NewRand(time.Now().UnixNano()) n := uint64(r.Int63()) return fmt.Sprintf("%x", n) } func (b *Behavior) prepareResponse(ctx context.Context, t crossdock.T, reqDwn *Downstream) (*Response, error) { log.Printf("prepareResponse: reqDwn=%v", reqDwn) logSpan(ctx) observedSpan, err := observeSpan(ctx) if err != nil { return nil, err } resp := &Response{ Span: observedSpan, } if reqDwn != nil { downstreamResp, err := b.callDownstream(ctx, reqDwn) if err != nil { if t != nil { t.Errorf("Error when calling downstream %+v: %s", reqDwn, err) } return nil, err } resp.Downstream = downstreamResp } return resp, nil } func (b *Behavior) callDownstream(ctx context.Context, downstream *Downstream) (*Response, error) { switch tchannel.Format(downstream.Encoding) { case tchannel.JSON: return b.jsonCall(ctx, downstream) case tchannel.Thrift: return b.thriftCall(ctx, downstream) default: return nil, errUnsupportedEncoding } } func observeSpan(ctx context.Context) (*ObservedSpan, error) { span := opentracing.SpanFromContext(ctx) if span == nil { return nil, errNoSpanObserved } sc, ok := span.Context().(jaeger.SpanContext) if !ok { return &ObservedSpan{}, nil } observedSpan := &ObservedSpan{ TraceID: fmt.Sprintf("%x", sc.TraceID()), Sampled: sc.IsSampled(), Baggage: span.BaggageItem(BaggageKey), } log.Printf("Observed span %+v", observedSpan) return observedSpan, nil } func logSpan(ctx context.Context) { if span := opentracing.SpanFromContext(ctx); span != nil { log.Printf("Span %s", span) } }