pkg/translator/zipkin/zipkinthriftconverter/to_domain.go (330 lines of code) (raw):

// Copyright The OpenTelemetry Authors // Copyright (c) 2019 The Jaeger Authors. // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 package zipkin // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinthriftconverter" import ( "bytes" "encoding/base64" "encoding/binary" "encoding/json" "errors" "fmt" "github.com/jaegertracing/jaeger-idl/model/v1" "github.com/jaegertracing/jaeger-idl/thrift-gen/zipkincore" "go.opentelemetry.io/otel/trace" ) const ( // UnknownServiceName is serviceName we give to model.Process if we cannot find it anywhere in a Zipkin span UnknownServiceName = "unknown-service-name" component = "component" peerservice = "peer.service" peerHostIPv4 = "peer.ipv4" peerHostIPv6 = "peer.ipv6" peerPort = "peer.port" ) var ( coreAnnotations = map[string]string{ zipkincore.SERVER_RECV: trace.SpanKindServer.String(), zipkincore.SERVER_SEND: trace.SpanKindServer.String(), zipkincore.CLIENT_RECV: trace.SpanKindClient.String(), zipkincore.CLIENT_SEND: trace.SpanKindClient.String(), } // Some tags on Zipkin spans really describe the process emitting them rather than an individual span. // Once all clients are upgraded to use native Jaeger model, this won't be happenning, but for now // we remove these tags from the span and store them in the Process. processTagAnnotations = map[string]string{ "jaegerClient": "jaeger.version", // transform this tag name to client.version "jaeger.hostname": "hostname", // transform this tag name to hostname "jaeger.version": "jaeger.version", // keep this key as is } trueByteSlice = []byte{1} // DefaultLogFieldKey is the log field key which translates directly into Zipkin's Annotation.Value, // provided it's the only field in the log. // In all other cases the fields are encoded into Annotation.Value as JSON string. // TODO move to domain model DefaultLogFieldKey = "event" // IPTagName is the Jaeger tag name for an IPv4/IPv6 IP address. // TODO move to domain model IPTagName = "ip" ) // ToDomain transforms a trace in zipkin.thrift format into model.Trace. // The transformation assumes that all spans have the same Trace ID. // A valid model.Trace is always returned, even when there are errors. // The errors are more of an "fyi", describing issues in the data. // TODO consider using different return type instead of `error`. func ToDomain(zSpans []*zipkincore.Span) (*model.Trace, error) { return toDomain{}.ToDomain(zSpans) } // ToDomainSpan transforms a span in zipkin.thrift format into model.Span. // A valid model.Span is always returned, even when there are errors. // The errors are more of an "fyi", describing issues in the data. // TODO consider using different return type instead of `error`. func ToDomainSpan(zSpan *zipkincore.Span) ([]*model.Span, error) { return toDomain{}.ToDomainSpans(zSpan) } type toDomain struct{} func (td toDomain) ToDomain(zSpans []*zipkincore.Span) (*model.Trace, error) { var errs []error processes := newProcessHashtable() trc := &model.Trace{} for _, zSpan := range zSpans { jSpans, err := td.ToDomainSpans(zSpan) if err != nil { errs = append(errs, err) } for _, jSpan := range jSpans { // remove duplicate Process instances jSpan.Process = processes.add(jSpan.Process) trc.Spans = append(trc.Spans, jSpan) } } return trc, errors.Join(errs...) } func (td toDomain) ToDomainSpans(zSpan *zipkincore.Span) ([]*model.Span, error) { jSpans := td.transformSpan(zSpan) jProcess, err := td.generateProcess(zSpan) for _, jSpan := range jSpans { jSpan.Process = jProcess } return jSpans, err } func (toDomain) findAnnotation(zSpan *zipkincore.Span, value string) *zipkincore.Annotation { for _, ann := range zSpan.Annotations { if ann.Value == value { return ann } } return nil } // transformSpan transforms a zipkin span into a Jaeger span func (td toDomain) transformSpan(zSpan *zipkincore.Span) []*model.Span { tags := td.getTags(zSpan.BinaryAnnotations, td.isSpanTag) if spanKindTag, ok := td.getSpanKindTag(zSpan.Annotations); ok { tags = append(tags, spanKindTag) } var traceIDHigh int64 if zSpan.TraceIDHigh != nil { traceIDHigh = *zSpan.TraceIDHigh } traceID := model.NewTraceID(uint64(traceIDHigh), uint64(zSpan.TraceID)) var refs []model.SpanRef if zSpan.ParentID != nil { parentSpanID := model.NewSpanID(uint64(*zSpan.ParentID)) refs = model.MaybeAddParentSpanID(traceID, parentSpanID, refs) } flags := td.getFlags(zSpan) startTime, duration := td.getStartTimeAndDuration(zSpan) result := []*model.Span{{ TraceID: traceID, SpanID: model.NewSpanID(uint64(zSpan.ID)), OperationName: zSpan.Name, References: refs, Flags: flags, StartTime: model.EpochMicrosecondsAsTime(uint64(startTime)), Duration: model.MicrosecondsAsDuration(uint64(duration)), Tags: tags, Logs: td.getLogs(zSpan.Annotations), }} cs := td.findAnnotation(zSpan, zipkincore.CLIENT_SEND) sr := td.findAnnotation(zSpan, zipkincore.SERVER_RECV) if cs != nil && sr != nil { // if the span is client and server we split it into two separate spans s := &model.Span{ TraceID: traceID, SpanID: model.NewSpanID(uint64(zSpan.ID)), OperationName: zSpan.Name, References: refs, Flags: flags, } // if the first span is a client span we create server span and vice-versa. if result[0].IsRPCClient() { s.Tags = []model.KeyValue{model.SpanKindTag(model.SpanKindServer)} s.StartTime = model.EpochMicrosecondsAsTime(uint64(sr.Timestamp)) if ss := td.findAnnotation(zSpan, zipkincore.SERVER_SEND); ss != nil { s.Duration = model.MicrosecondsAsDuration(uint64(ss.Timestamp - sr.Timestamp)) } } else { s.Tags = []model.KeyValue{model.SpanKindTag(model.SpanKindClient)} s.StartTime = model.EpochMicrosecondsAsTime(uint64(cs.Timestamp)) if cr := td.findAnnotation(zSpan, zipkincore.CLIENT_RECV); cr != nil { s.Duration = model.MicrosecondsAsDuration(uint64(cr.Timestamp - cs.Timestamp)) } } result = append(result, s) } return result } // getFlags takes a Zipkin Span and deduces the proper flags settings func (toDomain) getFlags(zSpan *zipkincore.Span) model.Flags { f := model.Flags(0) if zSpan.Debug { f.SetDebug() } return f } // Get a correct start time to use for the span if it's not set directly func (td toDomain) getStartTimeAndDuration(zSpan *zipkincore.Span) (timestamp, duration int64) { timestamp = zSpan.GetTimestamp() duration = zSpan.GetDuration() if timestamp == 0 { cs := td.findAnnotation(zSpan, zipkincore.CLIENT_SEND) sr := td.findAnnotation(zSpan, zipkincore.SERVER_RECV) if cs != nil { timestamp = cs.Timestamp cr := td.findAnnotation(zSpan, zipkincore.CLIENT_RECV) if cr != nil && duration == 0 { duration = cr.Timestamp - cs.Timestamp } } else if sr != nil { timestamp = sr.Timestamp ss := td.findAnnotation(zSpan, zipkincore.SERVER_SEND) if ss != nil && duration == 0 { duration = ss.Timestamp - sr.Timestamp } } } return timestamp, duration } // generateProcess takes a Zipkin Span and produces a model.Process. // An optional error may also be returned, but it is not fatal. func (td toDomain) generateProcess(zSpan *zipkincore.Span) (*model.Process, error) { tags := td.getTags(zSpan.BinaryAnnotations, td.isProcessTag) for i, tag := range tags { tags[i].Key = processTagAnnotations[tag.Key] } serviceName, ipv4, err := td.findServiceNameAndIP(zSpan) if ipv4 != 0 { // If the ip process tag already exists, don't add it again tags = append(tags, model.Int64(IPTagName, int64(uint64(ipv4)))) } return model.NewProcess(serviceName, tags), err } func (td toDomain) findServiceNameAndIP(zSpan *zipkincore.Span) (string, int32, error) { for _, a := range zSpan.Annotations { if td.isCoreAnnotation(a) && a.Host != nil && a.Host.ServiceName != "" { return a.Host.ServiceName, a.Host.Ipv4, nil } } for _, a := range zSpan.BinaryAnnotations { if a.Key == zipkincore.LOCAL_COMPONENT && a.Host != nil && a.Host.ServiceName != "" { return a.Host.ServiceName, a.Host.Ipv4, nil } } // If no core annotations exist, use the service name from any annotation for _, a := range zSpan.Annotations { if a.Host != nil && a.Host.ServiceName != "" { return a.Host.ServiceName, a.Host.Ipv4, nil } } // Tracer can also report a span with just binary annotation/s for _, a := range zSpan.BinaryAnnotations { if a.Host != nil && a.Host.ServiceName != "" { return a.Host.ServiceName, a.Host.Ipv4, nil } } err := fmt.Errorf( "cannot find service name in Zipkin span [traceID=%x, spanID=%x]", uint64(zSpan.TraceID), uint64(zSpan.ID)) return UnknownServiceName, 0, err } func (toDomain) isCoreAnnotation(annotation *zipkincore.Annotation) bool { _, ok := coreAnnotations[annotation.Value] return ok } func (toDomain) isProcessTag(binaryAnnotation *zipkincore.BinaryAnnotation) bool { _, ok := processTagAnnotations[binaryAnnotation.Key] return ok } func (td toDomain) isSpanTag(binaryAnnotation *zipkincore.BinaryAnnotation) bool { return !td.isProcessTag(binaryAnnotation) } type tagPredicate func(*zipkincore.BinaryAnnotation) bool func (td toDomain) getTags(binAnnotations []*zipkincore.BinaryAnnotation, tagInclude tagPredicate) []model.KeyValue { // this will be memory intensive due to how slices work, and it's specifically because we have to filter out // some binary annotations. improvement here would be just collecting the indices in binAnnotations we want. var retMe []model.KeyValue for _, annotation := range binAnnotations { if !tagInclude(annotation) { continue } switch annotation.Key { case zipkincore.LOCAL_COMPONENT: value := string(annotation.Value) tag := model.String(component, value) retMe = append(retMe, tag) case zipkincore.SERVER_ADDR, zipkincore.CLIENT_ADDR, zipkincore.MESSAGE_ADDR: retMe = td.getPeerTags(annotation.Host, retMe) default: tag, err := td.transformBinaryAnnotation(annotation) if err != nil { encoded := base64.StdEncoding.EncodeToString(annotation.Value) errMsg := fmt.Sprintf("Cannot parse Zipkin value %s: %v", encoded, err) tag = model.String(annotation.Key, errMsg) } retMe = append(retMe, tag) } } return retMe } func (toDomain) transformBinaryAnnotation(binaryAnnotation *zipkincore.BinaryAnnotation) (model.KeyValue, error) { switch binaryAnnotation.AnnotationType { case zipkincore.AnnotationType_BOOL: vBool := bytes.Equal(binaryAnnotation.Value, trueByteSlice) return model.Bool(binaryAnnotation.Key, vBool), nil case zipkincore.AnnotationType_BYTES: return model.Binary(binaryAnnotation.Key, binaryAnnotation.Value), nil case zipkincore.AnnotationType_DOUBLE: var d float64 if err := bytesToNumber(binaryAnnotation.Value, &d); err != nil { return model.KeyValue{}, err } return model.Float64(binaryAnnotation.Key, d), nil case zipkincore.AnnotationType_I16: var i int16 if err := bytesToNumber(binaryAnnotation.Value, &i); err != nil { return model.KeyValue{}, err } return model.Int64(binaryAnnotation.Key, int64(i)), nil case zipkincore.AnnotationType_I32: var i int32 if err := bytesToNumber(binaryAnnotation.Value, &i); err != nil { return model.KeyValue{}, err } return model.Int64(binaryAnnotation.Key, int64(i)), nil case zipkincore.AnnotationType_I64: var i int64 if err := bytesToNumber(binaryAnnotation.Value, &i); err != nil { return model.KeyValue{}, err } return model.Int64(binaryAnnotation.Key, i), nil case zipkincore.AnnotationType_STRING: return model.String(binaryAnnotation.Key, string(binaryAnnotation.Value)), nil } return model.KeyValue{}, fmt.Errorf("unknown zipkin annotation type: %d", binaryAnnotation.AnnotationType) } func bytesToNumber(b []byte, number any) error { buf := bytes.NewReader(b) return binary.Read(buf, binary.BigEndian, number) } func (td toDomain) getLogs(annotations []*zipkincore.Annotation) []model.Log { var retMe []model.Log for _, a := range annotations { // If the annotation has no value, throw it out if a.Value == "" { continue } if _, ok := coreAnnotations[a.Value]; ok { // skip core annotations continue } logFields := td.getLogFields(a) jLog := model.Log{ Timestamp: model.EpochMicrosecondsAsTime(uint64(a.Timestamp)), Fields: logFields, } retMe = append(retMe, jLog) } return retMe } func (toDomain) getLogFields(annotation *zipkincore.Annotation) []model.KeyValue { var logFields map[string]string // Since Zipkin format does not support kv-logging, some clients encode those Logs // as annotations with JSON value. Therefore, we try JSON decoding first. if err := json.Unmarshal([]byte(annotation.Value), &logFields); err == nil { fields := make([]model.KeyValue, len(logFields)) i := 0 for k, v := range logFields { fields[i] = model.String(k, v) i++ } return fields } return []model.KeyValue{model.String(DefaultLogFieldKey, annotation.Value)} } func (toDomain) getSpanKindTag(annotations []*zipkincore.Annotation) (model.KeyValue, bool) { for _, a := range annotations { if spanKind, ok := coreAnnotations[a.Value]; ok { return model.SpanKindTag(model.SpanKind(spanKind)), true } } return model.KeyValue{}, false } func (toDomain) getPeerTags(endpoint *zipkincore.Endpoint, tags []model.KeyValue) []model.KeyValue { if endpoint == nil { return tags } tags = append(tags, model.String(peerservice, endpoint.ServiceName)) if endpoint.Ipv4 != 0 { ipv4 := int64(uint32(endpoint.Ipv4)) tags = append(tags, model.Int64(peerHostIPv4, ipv4)) } if endpoint.Ipv6 != nil { // Zipkin defines Ipv6 field as: "IPv6 host address packed into 16 bytes. Ex Inet6Address.getBytes()". // https://github.com/openzipkin/zipkin-api/blob/master/thrift/zipkinCore.thrift#L305 tags = append(tags, model.Binary(peerHostIPv6, endpoint.Ipv6)) } if endpoint.Port != 0 { port := int64(uint16(endpoint.Port)) tags = append(tags, model.Int64(peerPort, port)) } return tags }