input/otlp/logs.go (177 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Portions copied from OpenTelemetry Collector (contrib), from the // elastic exporter. // // Copyright 2020, OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package otlp import ( "context" "encoding/hex" "strings" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" "github.com/elastic/apm-data/model/modelpb" ) // ConsumeLogsResult contains the number of rejected log records and error message for partial success response. type ConsumeLogsResult struct { ErrorMessage string RejectedLogRecords int64 } // ConsumeLogs calls ConsumeLogsWithResult but ignores the result. // It exists to satisfy the go.opentelemetry.io/collector/consumer.Logs interface. func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error { _, err := c.ConsumeLogsWithResult(ctx, logs) return err } // ConsumeLogsWithResult consumes OpenTelemetry log data, converting into // the Elastic APM log model and sending to the reporter. func (c *Consumer) ConsumeLogsWithResult(ctx context.Context, logs plog.Logs) (ConsumeLogsResult, error) { if err := semAcquire(ctx, c.sem, 1); err != nil { return ConsumeLogsResult{}, err } defer c.sem.Release(1) receiveTimestamp := time.Now() resourceLogs := logs.ResourceLogs() batch := make(modelpb.Batch, 0, resourceLogs.Len()) for i := 0; i < resourceLogs.Len(); i++ { c.convertResourceLogs(resourceLogs.At(i), receiveTimestamp, &batch) } if err := c.config.Processor.ProcessBatch(ctx, &batch); err != nil { return ConsumeLogsResult{}, err } return ConsumeLogsResult{RejectedLogRecords: 0}, nil } func (c *Consumer) convertResourceLogs(resourceLogs plog.ResourceLogs, receiveTimestamp time.Time, out *modelpb.Batch) { var timeDelta time.Duration resource := resourceLogs.Resource() baseEvent := modelpb.APMEvent{} baseEvent.Event = &modelpb.Event{} baseEvent.Event.Received = modelpb.FromTime(receiveTimestamp) translateResourceMetadata(resource, &baseEvent) if exportTimestamp, ok := exportTimestamp(resource); ok { timeDelta = receiveTimestamp.Sub(exportTimestamp) } scopeLogs := resourceLogs.ScopeLogs() for i := 0; i < scopeLogs.Len(); i++ { c.convertInstrumentationLibraryLogs(scopeLogs.At(i), &baseEvent, timeDelta, out) } } func (c *Consumer) convertInstrumentationLibraryLogs( in plog.ScopeLogs, baseEvent *modelpb.APMEvent, timeDelta time.Duration, out *modelpb.Batch, ) { otelLogs := in.LogRecords() for i := 0; i < otelLogs.Len(); i++ { event := c.convertLogRecord(otelLogs.At(i), in.Scope(), baseEvent, timeDelta) *out = append(*out, event) } } func (c *Consumer) convertLogRecord( record plog.LogRecord, scope pcommon.InstrumentationScope, baseEvent *modelpb.APMEvent, timeDelta time.Duration, ) *modelpb.APMEvent { event := baseEvent.CloneVT() initEventLabels(event) translateScopeMetadata(scope, event) if record.Timestamp() == 0 { event.Timestamp = modelpb.FromTime(record.ObservedTimestamp().AsTime().Add(timeDelta)) } else { event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta)) } if event.Event == nil { event.Event = &modelpb.Event{} } event.Event.Severity = uint64(record.SeverityNumber()) if event.Log == nil { event.Log = &modelpb.Log{} } event.Log.Level = record.SeverityText() if body := record.Body(); body.Type() != pcommon.ValueTypeEmpty { event.Message = body.AsString() if body.Type() == pcommon.ValueTypeMap { body.Map().Range(func(k string, v pcommon.Value) bool { setLabel(replaceDots(k), event, v) return true }) } } if traceID := record.TraceID(); !traceID.IsEmpty() { event.Trace = &modelpb.Trace{} event.Trace.Id = hex.EncodeToString(traceID[:]) } if spanID := record.SpanID(); !spanID.IsEmpty() { if event.Span == nil { event.Span = &modelpb.Span{} } event.Span.Id = hex.EncodeToString(spanID[:]) } attrs := record.Attributes() var exceptionMessage string var exceptionStacktrace string var exceptionType string var exceptionEscaped bool var eventName string var eventDomain string attrs.Range(func(k string, v pcommon.Value) bool { switch k { case semconv.AttributeExceptionMessage: exceptionMessage = v.Str() case semconv.AttributeExceptionStacktrace: exceptionStacktrace = v.Str() case semconv.AttributeExceptionType: exceptionType = v.Str() case semconv.AttributeExceptionEscaped: exceptionEscaped = v.Bool() case "event.name": eventName = v.Str() case "event.domain": eventDomain = v.Str() case "session.id": if event.Session == nil { event.Session = &modelpb.Session{} } event.Session.Id = v.Str() case semconv.AttributeNetworkConnectionType: if event.Network == nil { event.Network = &modelpb.Network{} } if event.Network.Connection == nil { event.Network.Connection = &modelpb.NetworkConnection{} } event.Network.Connection.Type = v.Str() // data_stream.* case attributeDataStreamDataset: if event.DataStream == nil { event.DataStream = &modelpb.DataStream{} } event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str()) case attributeDataStreamNamespace: if event.DataStream == nil { event.DataStream = &modelpb.DataStream{} } event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str()) default: setLabel(replaceDots(k), event, v) } return true }) // NOTE: we consider an error anything that contains an exception type // or message, independent of the severity level. if exceptionMessage != "" || exceptionType != "" { event.Error = convertOpenTelemetryExceptionSpanEvent( exceptionType, exceptionMessage, exceptionStacktrace, exceptionEscaped, event.Service.Language.Name, ) } // We need to check if the "event.name" has the "device" prefix based on the removal of the "event.domain" attribute // done in the OTel semantic conventions version 1.24.0. if (eventDomain == "device" && eventName != "") || strings.HasPrefix(eventName, "device.") { event.Event.Category = "device" action := strings.TrimPrefix(eventName, "device.") if action == "crash" { if event.Error == nil { event.Error = &modelpb.Error{} } event.Error.Type = "crash" } else { event.Event.Kind = "event" event.Event.Action = action } } if event.Error != nil { event.Event.Kind = "event" event.Event.Type = "error" } return event }