pkg/helper/pipeline_event_helper.go (210 lines of code) (raw):

package helper import ( "fmt" "sync" "time" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/util" ) var LogEventPool = sync.Pool{ New: func() interface{} { return new(protocol.LogEvent) }, } func CreateLogEvent(t time.Time, enableTimestampNano bool, fields map[string]string) (*protocol.LogEvent, error) { logEvent := LogEventPool.Get().(*protocol.LogEvent) logEvent.Timestamp = uint64(t.Unix())*1e9 + uint64(t.Nanosecond()) if len(logEvent.Contents) < len(fields) { slice := make([]*protocol.LogEvent_Content, len(logEvent.Contents), len(fields)) copy(slice, logEvent.Contents) logEvent.Contents = slice } else { logEvent.Contents = logEvent.Contents[:len(fields)] } i := 0 rawSize := 0 for key, val := range fields { if i >= len(logEvent.Contents) { logEvent.Contents = append(logEvent.Contents, &protocol.LogEvent_Content{}) } logEvent.Contents[i].Key = util.ZeroCopyStringToBytes(key) logEvent.Contents[i].Value = util.ZeroCopyStringToBytes(val) i++ rawSize += len(val) } logEvent.RawSize = uint64(rawSize) return logEvent, nil } func CreateLogEventByArray(t time.Time, enableTimestampNano bool, columns []string, values []string) (*protocol.LogEvent, error) { logEvent := LogEventPool.Get().(*protocol.LogEvent) logEvent.Timestamp = uint64(t.Unix())*1e9 + uint64(t.Nanosecond()) logEvent.Contents = make([]*protocol.LogEvent_Content, 0, len(columns)) if len(columns) != len(values) { return nil, fmt.Errorf("columns and values not equal") } rawSize := 0 for index := range columns { if index >= len(logEvent.Contents) { logEvent.Contents = append(logEvent.Contents, &protocol.LogEvent_Content{}) } logEvent.Contents[index].Key = util.ZeroCopyStringToBytes(columns[index]) logEvent.Contents[index].Value = util.ZeroCopyStringToBytes(values[index]) rawSize += len(values[index]) } logEvent.RawSize = uint64(rawSize) return logEvent, nil } func CreateLogEventByLegacyRawLog(log *protocol.Log) (*protocol.LogEvent, error) { logEvent := LogEventPool.Get().(*protocol.LogEvent) logEvent.Timestamp = uint64(log.GetTime())*1e9 + uint64(log.GetTimeNs()) logEvent.Contents = make([]*protocol.LogEvent_Content, 0, len(log.Contents)) rawSize := 0 for i, logC := range log.Contents { if i >= len(logEvent.Contents) { logEvent.Contents = append(logEvent.Contents, &protocol.LogEvent_Content{}) } logEvent.Contents[i].Key = util.ZeroCopyStringToBytes(logC.Key) logEvent.Contents[i].Value = util.ZeroCopyStringToBytes(logC.Value) rawSize += len(logC.Value) } logEvent.RawSize = uint64(rawSize) return logEvent, nil } func TransferLogEventToPB(log *models.Log) (*protocol.LogEvent, error) { logEvent := LogEventPool.Get().(*protocol.LogEvent) logEvent.Timestamp = log.GetTimestamp() logEvent.Contents = make([]*protocol.LogEvent_Content, 0, log.Contents.Len()) for k, v := range log.Contents.Iterator() { cont := &protocol.LogEvent_Content{ Key: util.ZeroCopyStringToBytes(k), Value: util.ZeroCopyStringToBytes(v.(string)), } logEvent.Contents = append(logEvent.Contents, cont) } logEvent.Level = util.ZeroCopyStringToBytes(log.GetLevel()) logEvent.FileOffset = log.GetOffset() logEvent.RawSize = log.GetRawSize() return logEvent, nil } func TransferMetricEventToPB(metric *models.Metric) (*protocol.MetricEvent, error) { var metricEvent protocol.MetricEvent metricEvent.Timestamp = metric.GetTimestamp() metricEvent.Name = util.ZeroCopyStringToBytes(metric.GetName()) if metric.GetValue().IsSingleValue() { metricEvent.Value = &protocol.MetricEvent_UntypedSingleValue{UntypedSingleValue: &protocol.UntypedSingleValue{Value: metric.Value.GetSingleValue()}} } else { return nil, fmt.Errorf("unsupported metric value type") } metricEvent.Tags = make(map[string][]byte, metric.GetTags().Len()) for k, v := range metric.GetTags().Iterator() { metricEvent.Tags[k] = util.ZeroCopyStringToBytes(v) } return &metricEvent, nil } func TransferSpanEventToPB(span *models.Span) (*protocol.SpanEvent, error) { var spanEvent protocol.SpanEvent spanEvent.Timestamp = span.GetTimestamp() spanEvent.TraceID = util.ZeroCopyStringToBytes(span.GetTraceID()) spanEvent.SpanID = util.ZeroCopyStringToBytes(span.GetSpanID()) spanEvent.TraceState = util.ZeroCopyStringToBytes(span.GetTraceState()) spanEvent.ParentSpanID = util.ZeroCopyStringToBytes(span.GetParentSpanID()) spanEvent.Name = util.ZeroCopyStringToBytes(span.GetName()) spanEvent.Kind = protocol.SpanEvent_SpanKind(span.GetKind()) spanEvent.StartTime = span.GetStartTime() spanEvent.EndTime = span.GetEndTime() spanEvent.Tags = make(map[string][]byte, span.GetTags().Len()) for k, v := range span.GetTags().Iterator() { spanEvent.Tags[k] = util.ZeroCopyStringToBytes(v) } spanEvent.Events = make([]*protocol.SpanEvent_InnerEvent, 0, len(span.GetEvents())) for _, srcEvent := range span.GetEvents() { dstEvent := protocol.SpanEvent_InnerEvent{ Timestamp: uint64(srcEvent.Timestamp), Name: util.ZeroCopyStringToBytes(srcEvent.Name), Tags: make(map[string][]byte, srcEvent.Tags.Len()), } for k, v := range srcEvent.Tags.Iterator() { dstEvent.Tags[k] = util.ZeroCopyStringToBytes(v) } spanEvent.Events = append(spanEvent.Events, &dstEvent) } spanEvent.Links = make([]*protocol.SpanEvent_SpanLink, 0, len(span.GetLinks())) for _, srcLink := range span.GetLinks() { dstLink := protocol.SpanEvent_SpanLink{ TraceID: util.ZeroCopyStringToBytes(srcLink.TraceID), SpanID: util.ZeroCopyStringToBytes(srcLink.SpanID), TraceState: util.ZeroCopyStringToBytes(srcLink.TraceState), Tags: make(map[string][]byte, srcLink.Tags.Len()), } for k, v := range srcLink.Tags.Iterator() { dstLink.Tags[k] = util.ZeroCopyStringToBytes(v) } spanEvent.Links = append(spanEvent.Links, &dstLink) } spanEvent.Status = protocol.SpanEvent_StatusCode(span.GetStatus()) return &spanEvent, nil } func CreatePipelineEventGroupByLegacyRawLog(logEvents []*protocol.LogEvent, configTag map[string]string, logTags map[string]string, ctx map[string]interface{}) (*protocol.PipelineEventGroup, error) { var pipelineEventGroup protocol.PipelineEventGroup pipelineEventGroup.PipelineEvents = &protocol.PipelineEventGroup_Logs{Logs: &protocol.PipelineEventGroup_LogEvents{Events: logEvents}} pipelineEventGroup.Tags = make(map[string][]byte, len(configTag)+len(logTags)) for k, v := range configTag { pipelineEventGroup.Tags[k] = util.ZeroCopyStringToBytes(v) } for k, v := range logTags { pipelineEventGroup.Tags[k] = util.ZeroCopyStringToBytes(v) } if ctx != nil { if source, ok := ctx["source"].(string); ok { pipelineEventGroup.Metadata = make(map[string][]byte) pipelineEventGroup.Metadata["source_id"] = util.ZeroCopyStringToBytes(source) } } return &pipelineEventGroup, nil } func TransferPipelineEventGroupToPB(groupInfo *models.GroupInfo, events []models.PipelineEvent) (*protocol.PipelineEventGroup, error) { var pipelineEventGroup protocol.PipelineEventGroup if len(events) == 0 { return nil, fmt.Errorf("events is empty") } eventType := events[0].GetType() switch eventType { case models.EventTypeLogging: logEvents := make([]*protocol.LogEvent, 0, len(events)) for _, event := range events { if logSrc, ok := event.(*models.Log); ok { logDst, _ := TransferLogEventToPB(logSrc) logEvents = append(logEvents, logDst) } } pipelineEventGroup.PipelineEvents = &protocol.PipelineEventGroup_Logs{Logs: &protocol.PipelineEventGroup_LogEvents{Events: logEvents}} case models.EventTypeMetric: metricEvents := make([]*protocol.MetricEvent, 0, len(events)) for _, event := range events { if metricSrc, ok := event.(*models.Metric); ok { metricDst, _ := TransferMetricEventToPB(metricSrc) metricEvents = append(metricEvents, metricDst) } } pipelineEventGroup.PipelineEvents = &protocol.PipelineEventGroup_Metrics{Metrics: &protocol.PipelineEventGroup_MetricEvents{Events: metricEvents}} case models.EventTypeSpan: spanEvents := make([]*protocol.SpanEvent, 0, len(events)) for _, event := range events { if spanSrc, ok := event.(*models.Span); ok { spanDst, _ := TransferSpanEventToPB(spanSrc) spanEvents = append(spanEvents, spanDst) } } pipelineEventGroup.PipelineEvents = &protocol.PipelineEventGroup_Spans{Spans: &protocol.PipelineEventGroup_SpanEvents{Events: spanEvents}} } pipelineEventGroup.Tags = make(map[string][]byte, groupInfo.Tags.Len()) for k, v := range groupInfo.Tags.Iterator() { pipelineEventGroup.Tags[k] = util.ZeroCopyStringToBytes(v) } pipelineEventGroup.Metadata = make(map[string][]byte, groupInfo.Metadata.Len()) for k, v := range groupInfo.Metadata.Iterator() { pipelineEventGroup.Metadata[k] = util.ZeroCopyStringToBytes(v) } return &pipelineEventGroup, nil }