plugins/input/skywalkingv2/trace_segment_handle.go (205 lines of code) (raw):
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package skywalkingv2
import (
"errors"
"fmt"
"strconv"
"strings"
"github.com/gogo/protobuf/proto"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/plugins/input/skywalkingv2/skywalking/apm/network/language/agent"
"github.com/alibaba/ilogtail/plugins/input/skywalkingv3"
)
const (
skywalkingTopicKey = "mq.topic"
skywalkingQueueKey = "mq.queue"
)
type TraceSegmentHandle struct {
RegistryInformationCache
context pipeline.Context
collector pipeline.Collector
compIDMessagingSystemMapping map[int32]string
}
func (t *TraceSegmentHandle) Collect(server agent.TraceSegmentService_CollectServer) error {
defer panicRecover()
for {
segmentObject, err := server.Recv()
if err != nil {
// 当Logtail重启后,使用这个版本的SW的服务会出现问题,由于SW没有下发命令的功能,所以没办法重新注册
// 出现数据上不来的问题,只能重启服务
return server.SendAndClose(&agent.Downstream{})
}
err = t.collectSegment(segmentObject)
if err != nil {
// 当Logtail重启后,使用这个版本的SW的服务会出现问题,由于SW没有下发命令的功能,所以没办法重新注册
// 出现数据上不来的问题,只能重启服务
return server.SendAndClose(&agent.Downstream{})
}
}
}
func (t *TraceSegmentHandle) collectSegment(upstream *agent.UpstreamSegment) error {
if len(upstream.GlobalTraceIds) == 0 {
return nil
}
segment := &agent.TraceSegmentObject{}
traceID := convertUniIDToString(upstream.GlobalTraceIds[0])
if e := proto.Unmarshal(upstream.GetSegment(), segment); e != nil {
return e
}
applicationInstance, ok := t.RegistryInformationCache.findApplicationInstanceRegistryInfo(segment.ApplicationInstanceId)
if !ok {
return errors.New("Application Not found")
}
for _, span := range segment.Spans {
if otTrace := t.parseSpan(span, applicationInstance, traceID, convertUniIDToString(segment.TraceSegmentId)); otTrace != nil {
log, err := otTrace.ToLog()
if err != nil {
logger.Error(t.context.GetRuntimeContext(), "SKYWALKING_TO_OT_TRACE_ERR", "err", err)
return err
}
t.collector.AddRawLog(log)
}
}
return nil
}
func (t *TraceSegmentHandle) parseSpan(span *agent.SpanObject, applicationInstance *ApplicationInstance, traceID string, traceSegmentID string) *skywalkingv3.OtSpan {
otSpan := skywalkingv3.NewOtSpan()
otSpan.Resource = applicationInstance.properties
otSpan.Service = applicationInstance.application.applicationName
otSpan.Host = applicationInstance.properties[skywalkingv3.AttributeHostName]
if span.OperationNameId != 0 {
e, ok := t.RegistryInformationCache.findEndpointRegistryInfoByID(span.OperationNameId)
if !ok {
return nil
}
otSpan.Name = e.endpointName
} else {
otSpan.Name = span.OperationName
}
switch {
case span.SpanLayer == agent.SpanLayer_MQ:
if span.SpanType == agent.SpanType_Entry {
otSpan.Kind = skywalkingv3.OpenTracingSpanKindConsumer
} else if span.SpanType == agent.SpanType_Exit {
otSpan.Kind = skywalkingv3.OpenTracingSpanKindProducer
}
t.mappingMessageSystemTag(span, otSpan)
case span.SpanType == agent.SpanType_Entry:
otSpan.Kind = skywalkingv3.OpenTracingSpanKindServer
case span.SpanType == agent.SpanType_Exit:
otSpan.Kind = skywalkingv3.OpenTracingSpanKindClient
mappingDatabaseTag(span, otSpan)
case span.SpanType == agent.SpanType_Local:
otSpan.Kind = skywalkingv3.OpenTracingSpanKindInternal
default:
otSpan.Kind = skywalkingv3.OpenTracingSpanKindUnspecified
}
otSpan.TraceID = traceID
otSpan.SpanID = traceSegmentID + "." + strconv.FormatInt(int64(span.SpanId), 10)
if span.ParentSpanId < 0 {
otSpan.ParentSpanID = ""
} else {
otSpan.ParentSpanID = traceSegmentID + "." + strconv.FormatInt(int64(span.ParentSpanId), 10)
}
otSpan.Logs = make([]map[string]string, len(span.Logs))
for i, log := range span.Logs {
logEvent := make(map[string]string)
logEvent["time"] = strconv.FormatInt(log.Time, 10)
for _, kv := range log.Data {
logEvent[kv.Key] = kv.Value
if kv.Key == "error.kind" && len(kv.Value) > 0 {
otSpan.StatusMessage = kv.Value
}
}
otSpan.Logs[i] = logEvent
}
otSpan.Links = make([]*skywalkingv3.OtSpanRef, len(span.Refs))
if len(span.Refs) > 0 {
for i, ref := range span.Refs {
parentTraceSegmentID := ""
for _, part := range ref.ParentTraceSegmentId.IdParts {
parentTraceSegmentID += fmt.Sprintf("%d", part)
}
spanRef := &skywalkingv3.OtSpanRef{
TraceID: traceID,
SpanID: convertUniIDToString(ref.ParentTraceSegmentId) + "." + strconv.FormatInt(int64(ref.ParentSpanId), 10),
TraceState: "",
Attributes: nil,
}
otSpan.Links[i] = spanRef
}
otSpan.ParentSpanID = convertUniIDToString(span.Refs[0].ParentTraceSegmentId) + "." + strconv.FormatInt(int64(span.Refs[0].ParentSpanId), 10)
}
otSpan.Start = span.StartTime * 1000
otSpan.End = span.EndTime * 1000
otSpan.Duration = 1000 * (span.EndTime - span.StartTime)
if len(span.Peer) > 0 {
hostport := strings.Split(span.Peer, ":")
otSpan.Attribute[skywalkingv3.AttributeNetPeerIP] = hostport[0]
if len(hostport) == 2 {
otSpan.Attribute[skywalkingv3.AttributeNetPeerPort] = hostport[1]
}
}
if len(span.Tags) > 0 {
for _, tag := range span.Tags {
otKey, ok := skywalkingv3.OtSpanTagsMapping[tag.Key]
if ok {
otSpan.Attribute[otKey] = tag.Value
} else {
if skywalkingTopicKey == tag.Key {
otSpan.Attribute[skywalkingv3.AttributeMessagingDestinationKind] = "topic"
otSpan.Attribute[skywalkingv3.AttributeMessagingDestination] = tag.Value
} else if skywalkingQueueKey == tag.Key {
otSpan.Attribute[skywalkingv3.AttributeMessagingDestinationKind] = "queue"
otSpan.Attribute[skywalkingv3.AttributeMessagingDestination] = tag.Value
}
otSpan.Attribute[tag.Key] = tag.Value
}
}
}
if span.IsError {
otSpan.StatusCode = skywalkingv3.StatusCodeError
} else {
otSpan.StatusCode = skywalkingv3.StatusCodeOk
}
switch {
case span.SpanLayer == agent.SpanLayer_MQ:
t.mappingMessageSystemTag(span, otSpan)
case span.SpanType == agent.SpanType_Exit:
mappingDatabaseTag(span, otSpan)
}
return otSpan
}
func mappingDatabaseTag(span *agent.SpanObject, otSpan *skywalkingv3.OtSpan) {
if span.GetPeer() == "" {
return
}
if span.SpanLayer != agent.SpanLayer_Database {
return
}
var dbType string
for _, tag := range span.GetTags() {
if tag.Key == "db.type" {
dbType = tag.Value
break
}
}
if dbType == "" {
return
}
otSpan.Attribute[skywalkingv3.AttributeDBConnectionString] = strings.ToLower(dbType) + "://" + span.GetPeer()
}
func convertUniIDToString(u *agent.UniqueId) string {
if len(u.IdParts) == 0 {
return ""
}
parentTraceSegmentID := ""
for _, part := range u.IdParts {
parentTraceSegmentID += fmt.Sprintf("%d.", part)
}
return parentTraceSegmentID[:len(parentTraceSegmentID)-1]
}
func (t *TraceSegmentHandle) mappingMessageSystemTag(span *agent.SpanObject, otSpan *skywalkingv3.OtSpan) {
messageSystem, finded := t.compIDMessagingSystemMapping[span.ComponentId]
if finded {
otSpan.Attribute[skywalkingv3.AttributeMessagingSystem] = messageSystem
} else {
otSpan.Attribute[skywalkingv3.AttributeMessagingSystem] = "MessagingSystem"
}
}