plugins/input/skywalkingv2/trace_segment_report_handle.go (240 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http:www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package skywalkingv2 import ( "context" "crypto/rand" "errors" "fmt" "io" "math" "math/big" "runtime" "strconv" "strings" "google.golang.org/protobuf/proto" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/plugins/input/skywalkingv2/skywalking/apm/network/common" "github.com/alibaba/ilogtail/plugins/input/skywalkingv2/skywalking/apm/network/language/agent" v2 "github.com/alibaba/ilogtail/plugins/input/skywalkingv2/skywalking/apm/network/language/agent/v2" "github.com/alibaba/ilogtail/plugins/input/skywalkingv3" ) type TraceSegmentReportHandle struct { RegistryInformationCache context pipeline.Context collector pipeline.Collector compIDMessagingSystemMapping map[int32]string } func panicRecover() { if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) logger.Error(context.Background(), "PLUGIN_RUNTIME_ALARM", "skywalking v2 runtime panic error", err, "stack", string(trace)) } } func (t *TraceSegmentReportHandle) Collect(server v2.TraceSegmentReportService_CollectServer) error { defer panicRecover() for { segmentObject, err := server.Recv() if err != nil { if err == io.EOF { return server.SendAndClose(&common.Commands{}) } return err } e := t.collectSegment(server, segmentObject) if e != nil { return server.SendAndClose(&common.Commands{}) } } } func (t *TraceSegmentReportHandle) collectSegment(server v2.TraceSegmentReportService_CollectServer, upstream *agent.UpstreamSegment) error { if len(upstream.GlobalTraceIds) == 0 { return nil } segment := &v2.SegmentObject{} if err := proto.Unmarshal(upstream.GetSegment(), segment); err != nil { return err } jaegerFormat, traceID := getTraceID(upstream.GlobalTraceIds[0]) applicationInstance, ok := t.RegistryInformationCache.findApplicationInstanceRegistryInfo(segment.ServiceInstanceId) if !ok || applicationInstance.application == nil { // 如果Instance不存在,让应用重新注册,会造成丢失部分数据 args := make([]*common.KeyStringValuePair, 0) serializeNumber, _ := rand.Int(rand.Reader, big.NewInt(math.MaxInt64)) args = append(args, &common.KeyStringValuePair{Key: "SerialNumber", Value: fmt.Sprintf("reset-%d", serializeNumber)}) var commands []*common.Command commands = append(commands, &common.Command{Command: "ServiceMetadataReset", Args: args}) return server.SendAndClose(&common.Commands{ Commands: commands, }) } spanMapping := make(map[int32]*v2.SpanObjectV2) for _, span := range segment.Spans { spanMapping[span.SpanId] = span } for _, span := range segment.Spans { _, traceSegmentID := getTraceID(segment.TraceSegmentId) var otTrace *skywalkingv3.OtSpan var err error if jaegerFormat { otTrace, err = t.parseSpan(span, applicationInstance, traceID, traceSegmentID, spanMapping, generateSpanIDByJaeger, generateParentSpanIDByJaeger) } else { otTrace, err = t.parseSpan(span, applicationInstance, traceID, traceSegmentID, spanMapping, generateSpanIDByOriginal, generateParentSpanIDByOriginal) } if otTrace == nil { if err != nil { args := make([]*common.KeyStringValuePair, 0) serializeNumber, _ := rand.Int(rand.Reader, big.NewInt(math.MaxInt64)) args = append(args, &common.KeyStringValuePair{Key: "SerialNumber", Value: fmt.Sprintf("reset-%d", serializeNumber)}) _ = server.SendMsg(common.Command{ Command: "ServiceMetadataReset", Args: args}) } } else { log, err := otTrace.ToLog() if err != nil { logger.Error(t.context.GetRuntimeContext(), "SKYWALKING_TO_OT_TRACE_ERR", "err", err) return err } t.collector.AddRawLog(log) } } return nil } func (t *TraceSegmentReportHandle) parseSpan(span *v2.SpanObjectV2, applicationInstance *ApplicationInstance, traceID string, traceSegmentID string, spanMapping map[int32]*v2.SpanObjectV2, generateSpanID func(applicationInstance *ApplicationInstance, traceSegmentID string, span *v2.SpanObjectV2) string, generateParentSpanID func(ref *v2.SegmentReference) string) (*skywalkingv3.OtSpan, error) { otSpan := skywalkingv3.NewOtSpan() otSpan.Resource = applicationInstance.properties otSpan.Service = applicationInstance.application.applicationName otSpan.Host = applicationInstance.properties[skywalkingv3.AttributeHostName] if span.OperationNameId != 0 { e, ok := t.RegistryInformationCache.findEndpointRegistryInfoByID(span.OperationNameId) if !ok || e == nil { return nil, errors.New("Failed to find OperationName ID") } otSpan.Name = e.endpointName } else { otSpan.Name = span.OperationName } switch { case span.SpanLayer == agent.SpanLayer_MQ: if span.SpanType == agent.SpanType_Entry { otSpan.Kind = skywalkingv3.OpenTracingSpanKindConsumer } else if span.SpanType == agent.SpanType_Exit { otSpan.Kind = skywalkingv3.OpenTracingSpanKindProducer } t.mappingMessageSystemTag(span, otSpan) case span.SpanType == agent.SpanType_Entry: otSpan.Kind = skywalkingv3.OpenTracingSpanKindServer case span.SpanType == agent.SpanType_Exit: otSpan.Kind = skywalkingv3.OpenTracingSpanKindClient case span.SpanType == agent.SpanType_Local: otSpan.Kind = skywalkingv3.OpenTracingSpanKindInternal default: otSpan.Kind = skywalkingv3.OpenTracingSpanKindUnspecified } otSpan.TraceID = traceID otSpan.SpanID = generateSpanID(applicationInstance, traceSegmentID, span) if span.ParentSpanId < 0 { otSpan.ParentSpanID = "" } else { otSpan.ParentSpanID = generateSpanID(applicationInstance, traceSegmentID, spanMapping[span.ParentSpanId]) } otSpan.Logs = make([]map[string]string, len(span.Logs)) for i, log := range span.Logs { logEvent := make(map[string]string) logEvent["time"] = strconv.FormatInt(log.Time, 10) for _, kv := range log.Data { logEvent[kv.Key] = kv.Value if kv.Key == "error.kind" && len(kv.Value) > 0 { otSpan.StatusMessage = kv.Value } } otSpan.Logs[i] = logEvent } otSpan.Links = make([]*skywalkingv3.OtSpanRef, len(span.Refs)) if len(span.Refs) > 0 { for i, ref := range span.Refs { spanRef := &skywalkingv3.OtSpanRef{ TraceID: traceID, SpanID: generateParentSpanID(ref), } otSpan.Links[i] = spanRef } otSpan.ParentSpanID = generateParentSpanID(span.Refs[0]) } otSpan.Start = span.StartTime * 1000 otSpan.End = span.EndTime * 1000 otSpan.Duration = 1000 * (span.EndTime - span.StartTime) if len(span.Peer) > 0 { hostport := strings.Split(span.Peer, ":") otSpan.Attribute[skywalkingv3.AttributeNetPeerIP] = hostport[0] if len(hostport) == 2 { otSpan.Attribute[skywalkingv3.AttributeNetPeerPort] = hostport[1] } } if len(span.Tags) > 0 { for _, tag := range span.Tags { otKey, ok := skywalkingv3.OtSpanTagsMapping[tag.Key] if ok { otSpan.Attribute[otKey] = tag.Value } else { if skywalkingTopicKey == tag.Key { otSpan.Attribute[skywalkingv3.AttributeMessagingDestinationKind] = "topic" otSpan.Attribute[skywalkingv3.AttributeMessagingDestination] = tag.Value } else if skywalkingQueueKey == tag.Key { otSpan.Attribute[skywalkingv3.AttributeMessagingDestinationKind] = "queue" otSpan.Attribute[skywalkingv3.AttributeMessagingDestination] = tag.Value } otSpan.Attribute[tag.Key] = tag.Value } } } if span.IsError { otSpan.StatusCode = skywalkingv3.StatusCodeError } else { otSpan.StatusCode = skywalkingv3.StatusCodeOk } return otSpan, nil } func generateParentSpanIDByJaeger(ref *v2.SegmentReference) string { return fmt.Sprintf("%08x", uint32(ref.ParentServiceInstanceId)) + fmt.Sprintf("%08x", uint32(ref.ParentSpanId)) } func generateParentSpanIDByOriginal(ref *v2.SegmentReference) string { return convertUniIDToString(ref.ParentTraceSegmentId) + "." + strconv.FormatInt(int64(ref.ParentSpanId), 10) } func generateSpanIDByJaeger(applicationInstance *ApplicationInstance, traceSegmentID string, span *v2.SpanObjectV2) string { return traceSegmentID[len(traceSegmentID)-12:] + fmt.Sprintf("%04x", span.SpanId)[0:4] } func generateSpanIDByOriginal(applicationInstance *ApplicationInstance, traceSegmentID string, span *v2.SpanObjectV2) string { return traceSegmentID + "." + strconv.FormatInt(int64(span.SpanId), 10) } func getTraceID(u *agent.UniqueId) (jeagerFormat bool, traceID string) { if len(u.IdParts) == 0 { return false, "" } if u.IdParts[0] == 648495579 { jeagerFormat = true } if jeagerFormat { for i := 1; i < len(u.IdParts); i++ { traceID += fmt.Sprintf("%016x", uint64(u.IdParts[i])) } } else { for _, part := range u.IdParts { traceID += fmt.Sprintf("%d.", part) } traceID = traceID[0 : len(traceID)-1] } return jeagerFormat, traceID } func (t *TraceSegmentReportHandle) mappingMessageSystemTag(span *v2.SpanObjectV2, otSpan *skywalkingv3.OtSpan) { messageSystem, finded := t.compIDMessagingSystemMapping[span.ComponentId] if finded { otSpan.Attribute[skywalkingv3.AttributeMessagingSystem] = messageSystem } else { otSpan.Attribute[skywalkingv3.AttributeMessagingSystem] = "MessagingSystem" } }