store/kustoSpan.go (255 lines of code) (raw):

package store import ( "encoding/json" "fmt" "reflect" "strconv" "strings" "time" "github.com/Azure/azure-kusto-go/kusto/data/value" "github.com/hashicorp/go-hclog" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" ) type kustoSpan struct { TraceID string `kusto:"TraceID"` SpanID string `kusto:"SpanID"` SpanName string `kusto:"SpanName"` References value.Dynamic `kusto:"References"` Flags int32 `kusto:"Flags"` StartTime time.Time `kusto:"StartTime"` Duration int64 `kusto:"Duration"` Tags value.Dynamic `kusto:"Tags"` Logs value.Dynamic `kusto:"Logs"` Links []link `kusto:"Links"` ProcessServiceName string `kusto:"ProcessServiceName"` ProcessTags value.Dynamic `kusto:"ProcessTags"` ProcessID string `kusto:"ProcessID"` SpanKind string `kusto:"SpanKind"` SpanStatus string `kusto:"SpanStatus"` } type link struct { TraceID dbmodel.TraceID `json:"TraceID"` SpanID dbmodel.SpanID `json:"SpanID"` RefType string `kusto:"RefType"` TraceState string `kusto:"TraceState,omitempty"` SpanLinkAttributes value.Dynamic `kusto:"SpanLinkAttributes,omitempty"` } type event struct { EventName string `kusto:"EventName"` Timestamp string `kusto:"Timestamp"` EventAttributes map[string]interface{} `kusto:"EventAttributes"` } const ( // TagDotReplacementCharacter state which character should replace the dot in dynamic column TagDotReplacementCharacter = "_" ) func transformKustoSpanToModelSpan(kustoSpan *kustoSpan, logger hclog.Logger) (*model.Span, error) { // eMin":"datetime(2024-03-13T15:56:28.628Z)"}: EXTRA_VALUE_AT_END=<nil> @module=jaeger-kusto timestamp=2024-03-15T15:56:28.634Z //2024-03-15T15:56:29.206Z [ERROR] jaeger-kusto: Error parsing span to domain. Error not a valid SpanRefType string . The TraceId is d1b06c73d963045e657158dbd0ccf6d9 and the SpanId is cfb683d327e4dd90 : @module=jaeger-kusto timestamp=2024-03-15T15:56:29.205Z // spanReferences, err := transformReferencesToLinks(kustoSpan, logger) if err != nil { logger.Error(fmt.Sprintf("Error in Unmarshal Refs %s. TraceId: %s SpanId: %s ", kustoSpan.Tags.String(), kustoSpan.TraceID, kustoSpan.SpanID), err) return nil, err } var tags map[string]interface{} err = json.Unmarshal(kustoSpan.Tags.Value, &tags) if err != nil { logger.Error(fmt.Sprintf("Error in Unmarshal tags %s. TraceId: %s SpanId: %s ", kustoSpan.Tags.String(), kustoSpan.TraceID, kustoSpan.SpanID), err) return nil, err } // Fix issues where there are JSON Array types in tags. On nested tag types convert arrays to string. Else this causes issues in span parsing in Jaeger span transformations for key, element := range tags { elementString := fmt.Sprint(element) isArray := len(elementString) > 0 && elementString[0] == '[' if isArray { tags[key] = elementString } } // https://opentelemetry.io/docs/specs/otel/trace/sdk_exporters/jaeger/#status switch kustoSpan.SpanStatus { case "STATUS_CODE_ERROR": tags["otel.status_code"] = "ERROR" tags["error"] = true case "STATUS_CODE_OK": tags["otel.status_code"] = "OK" default: break } // https://opentelemetry.io/docs/specs/otel/trace/sdk_exporters/jaeger/#spankind switch kustoSpan.SpanKind { case "SPAN_KIND_SERVER": tags["span.kind"] = "server" case "SPAN_KIND_CLIENT": tags["span.kind"] = "client" case "SPAN_KIND_CONSUMER": tags["span.kind"] = "consumer" case "SPAN_KIND_PRODUCER": tags["span.kind"] = "producer" default: break } logs, err := transformEventsToLogs(kustoSpan, logger) if err != nil { logger.Error(fmt.Sprintf("Error in transform (transformEventsToLogs) %s. TraceId: %s SpanId: %s ", kustoSpan.Tags.String(), kustoSpan.TraceID, kustoSpan.SpanID), err) return nil, err } process := dbmodel.Process{ ServiceName: kustoSpan.ProcessServiceName, Tags: nil, Tag: nil, } escapeProcessTags(kustoSpan.ProcessTags.Value) // Replace the special chars(including start and end []) for correct JSON parsing replacer := strings.NewReplacer(":[", ":\"[", "],", "]\",", "\\", "") processTag := []byte(replacer.Replace(string(kustoSpan.ProcessTags.Value))) err = json.Unmarshal(processTag, &process.Tag) // See if this parsing yielded an error ? if err != nil { logger.Error(fmt.Sprintf("ERROR in Unmarshal processTags %s. TraceId: %s SpanId: %s ", string(kustoSpan.ProcessTags.Value), kustoSpan.TraceID, kustoSpan.SpanID), err) return nil, err } jsonSpan := &dbmodel.Span{ TraceID: dbmodel.TraceID(kustoSpan.TraceID), SpanID: dbmodel.SpanID(kustoSpan.SpanID), Flags: uint32(kustoSpan.Flags), OperationName: kustoSpan.SpanName, References: spanReferences, StartTime: uint64(kustoSpan.StartTime.UnixMicro()), StartTimeMillis: uint64(kustoSpan.StartTime.UnixMilli()), Duration: uint64(kustoSpan.Duration), Tags: nil, Tag: tags, Logs: logs, Process: process, } spanConverter := dbmodel.NewToDomain(TagDotReplacementCharacter) convertedSpan, err := spanConverter.SpanToDomain(jsonSpan) if err != nil { logger.Error(fmt.Sprintf("Error parsing span to domain. Error %s. The TraceId is %s and the SpanId is %s ", err, kustoSpan.TraceID, kustoSpan.SpanID)) return nil, err } span := &model.Span{ TraceID: convertedSpan.TraceID, SpanID: convertedSpan.SpanID, OperationName: kustoSpan.SpanName, References: convertedSpan.References, Flags: convertedSpan.Flags, StartTime: kustoSpan.StartTime, Duration: time.Duration(kustoSpan.Duration) * time.Microsecond, Tags: convertedSpan.Tags, Logs: convertedSpan.Logs, Process: convertedSpan.Process, } return span, err } func transformReferencesToLinks(kustoSpan *kustoSpan, logger hclog.Logger) ([]dbmodel.Reference, error) { // There are 2 parts in the links. The first one is the CHILD_OF hierarchy and the second one is the FOLLOWS_FROM hierarchy // Ref : https://opentelemetry.io/docs/specs/otel/trace/sdk_exporters/jaeger/#links // Note that we can convert SpanLinkAttributes to logs too. But this is not added at the moment var childOfRefs []dbmodel.Reference referenceValue := kustoSpan.References.Value if len(referenceValue) > 0 { err := json.Unmarshal(referenceValue, &childOfRefs) if err != nil { logger.Error(fmt.Sprintf("Error in Unmarshal CO refs %s. TraceId: %s SpanId: %s. References: %s", kustoSpan.References.String(), kustoSpan.TraceID, kustoSpan.SpanID, kustoSpan.References.Value), err) return nil, err } } var followsFromRefs []dbmodel.Reference for _, ref := range kustoSpan.Links { if ref.TraceID == "" || ref.SpanID == "" { // Skip the empty references logger.Warn(fmt.Sprintf("Empty link TraceID or SpanID for RefType %s . TraceId: %s SpanId: %s", ref.RefType, kustoSpan.TraceID, kustoSpan.SpanID)) } else { followsFromRefs = append(followsFromRefs, dbmodel.Reference{ RefType: dbmodel.FollowsFrom, TraceID: ref.TraceID, SpanID: ref.SpanID, }) } } // Combine the childOfRefs and followsFromRefs spanRefs := append(childOfRefs, followsFromRefs...) return spanRefs, nil } // Ref : https://opentelemetry.io/docs/specs/otel/trace/sdk_exporters/jaeger/#events func transformEventsToLogs(kustoSpan *kustoSpan, logger hclog.Logger) ([]dbmodel.Log, error) { var events []event err := json.Unmarshal(kustoSpan.Logs.Value, &events) if err != nil { return nil, err } // Get the events field from events and convert it to logs var logs []dbmodel.Log // Map event to logs that can be set. ref: https://opentelemetry.io/docs/reference/specification/trace/sdk_exporters/jaeger/#events // Set all the events' timestam and attibute, to log's timestamp and fields by iterating over span events for _, evt := range events { log := dbmodel.Log{} var kvs []dbmodel.KeyValue timestamp := evt.Timestamp if timestamp != "" { t, terr := time.Parse(time.RFC3339Nano, timestamp) if terr != nil { logger.Warn(fmt.Sprintf("Error parsing log timestamp. Error %s. TraceId: %s SpanId: %s & timestamp: %s ", terr.Error(), kustoSpan.TraceID, kustoSpan.SpanID, timestamp)) } else { log.Timestamp = uint64(t.UnixMicro()) } } // EventName should be added as log's field. kvs = append(kvs, dbmodel.KeyValue{ Key: "event", Value: evt.EventName, Type: dbmodel.StringType, }) for ek, ev := range evt.EventAttributes { kv := dbmodel.KeyValue{ Key: ek, Value: fmt.Sprint(ev), Type: dbmodel.ValueType(strings.ToLower(reflect.TypeOf(ev).String())), } kvs = append(kvs, kv) } log.Fields = kvs logs = append(logs, log) } return logs, nil } // escapeProcessTags replaces the double quotes with single quotes in the process tags list func escapeProcessTags(processTagsString []byte) { var insideSquareBrackets bool for i := 0; i < len(processTagsString); i++ { if processTagsString[i] == '[' { insideSquareBrackets = true } else if processTagsString[i] == ']' { insideSquareBrackets = false } else if insideSquareBrackets && processTagsString[i] == '"' { processTagsString[i] = '\'' } } } func getTagsValues(tags []model.KeyValue) []string { var values []string for i := range tags { values = append(values, tags[i].VStr) } return values } // TransformSpanToStringArray converts span to string ready for Kusto ingestion func TransformSpanToStringArray(span *model.Span) ([]string, error) { spanConverter := dbmodel.NewFromDomain(true, getTagsValues(span.Tags), TagDotReplacementCharacter) jsonSpan := spanConverter.FromDomainEmbedProcess(span) references, err := json.Marshal(jsonSpan.References) if err != nil { return nil, err } tags, err := json.Marshal(jsonSpan.Tag) if err != nil { return nil, err } logs, err := json.Marshal(jsonSpan.Logs) if err != nil { return nil, err } processTags, err := json.Marshal(jsonSpan.Process.Tag) if err != nil { return nil, err } kustoStringSpan := []string{ span.TraceID.String(), span.SpanID.String(), span.OperationName, string(references), strconv.FormatUint(uint64(span.Flags), 10), span.StartTime.Format(time.RFC3339Nano), value.Timespan{Value: span.Duration, Valid: true}.Marshal(), string(tags), string(logs), span.Process.ServiceName, string(processTags), span.ProcessID, } return kustoStringSpan, err }