src/Elastic.Apm/OpenTelemetry/ElasticActivityListener.cs (377 lines of code) (raw):

// Licensed to Elasticsearch B.V under // one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information #if NET8_0_OR_GREATER using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using Elastic.Apm.Api; using Elastic.Apm.DiagnosticListeners; using Elastic.Apm.DistributedTracing; using Elastic.Apm.Helpers; using Elastic.Apm.Logging; using Elastic.Apm.Model; namespace Elastic.Apm.OpenTelemetry { public class ElasticActivityListener : IDisposable { private static readonly string[] ServerPortAttributeKeys = [SemanticConventions.ServerPort, SemanticConventions.NetPeerPort]; private static readonly string[] ServerAddressAttributeKeys = [SemanticConventions.ServerAddress, SemanticConventions.NetPeerName, SemanticConventions.NetPeerIp]; private static readonly string[] HttpAttributeKeys = [SemanticConventions.UrlFull, SemanticConventions.HttpUrl, SemanticConventions.HttpScheme]; private static readonly string[] HttpUrlAttributeKeys = [SemanticConventions.UrlFull, SemanticConventions.HttpUrl]; private readonly ConditionalWeakTable<Activity, Span> _activeSpans = new(); private readonly ConditionalWeakTable<Activity, Transaction> _activeTransactions = new(); internal ElasticActivityListener(IApmAgent agent, HttpTraceConfiguration httpTraceConfiguration) => (_logger, _httpTraceConfiguration) = (agent.Logger?.Scoped(nameof(ElasticActivityListener)), httpTraceConfiguration); private static readonly bool HasServiceBusInstrumentation = AppDomain.CurrentDomain.GetAssemblies().SingleOrDefault(assembly => assembly.GetName().Name == "Elastic.Apm.Azure.ServiceBus") != null; private static readonly bool HasStorageInstrumentation = AppDomain.CurrentDomain.GetAssemblies().SingleOrDefault(assembly => assembly.GetName().Name == "Elastic.Apm.Azure.Storage") != null; private readonly IApmLogger _logger; private Tracer _tracer; private readonly HttpTraceConfiguration _httpTraceConfiguration; private bool _disposed; internal void Start(Tracer tracerInternal) { _tracer = tracerInternal; Listener = new ActivityListener { ActivityStarted = ActivityStarted, ActivityStopped = ActivityStopped, ShouldListenTo = _ => true, Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData }; ActivitySource.AddActivityListener(Listener); } private ActivityListener Listener { get; set; } private Action<Activity> ActivityStarted => activity => { // Prevent recording of Azure Functions activities which are quite broken at the moment // See https://github.com/Azure/azure-functions-dotnet-worker/issues/2733 // See https://github.com/Azure/azure-functions-dotnet-worker/issues/2875 // See https://github.com/Azure/azure-functions-host/issues/10641 // See https://github.com/Azure/azure-functions-dotnet-worker/issues/2810 if ((activity.Source.Name == "" && activity.DisplayName == "InvokeFunctionAsync") || (activity.Source.Name == "Microsoft.Azure.Functions.Worker")) { return; } // If the Elastic instrumentation for ServiceBus is present, we skip duplicating the instrumentation through the OTel bridge. // Without this, we end up with some redundant spans in the trace with subtle differences. if (HasServiceBusInstrumentation && activity.Tags.Any(kvp => kvp.Key.Equals("az.namespace", StringComparison.Ordinal) && kvp.Value.Equals("Microsoft.ServiceBus", StringComparison.Ordinal))) { _logger?.Debug()?.Log("ActivityStarted: name:{DisplayName} id:{ActivityId} traceId:{TraceId} skipped 'Microsoft.ServiceBus' " + "activity because 'Elastic.Apm.Azure.ServiceBus' is present in the application.", activity.DisplayName, activity.Id, activity.TraceId); return; } if (HasStorageInstrumentation && activity.Tags.Any(kvp => kvp.Key.Equals("az.namespace", StringComparison.Ordinal) && kvp.Value.Equals("Microsoft.Storage", StringComparison.Ordinal))) { _logger?.Debug()?.Log("ActivityStarted: name:{DisplayName} id:{ActivityId} traceId:{TraceId} skipped 'Microsoft.Storage' " + "activity because 'Elastic.Apm.Azure.Storage' is present in the application.", activity.DisplayName, activity.Id, activity.TraceId); return; } if (KnownListeners.SkippedActivityNamesSet.Contains(activity.DisplayName)) { _logger?.Trace()?.Log("ActivityStarted: name:{DisplayName} id:{ActivityId} traceId:{TraceId} skipped because it matched " + "a skipped activity name defined in KnownListeners.", activity.DisplayName, activity.Id, activity.TraceId); return; } _logger?.Trace()?.Log("ActivityStarted: name:{DisplayName} id:{ActivityId} traceId:{TraceId}", activity.DisplayName, activity.Id, activity.TraceId); var spanLinks = new List<SpanLink>(activity.Links.Count()); if (activity.Links.Any()) { foreach (var link in activity.Links) spanLinks.Add(new SpanLink(link.Context.SpanId.ToString(), link.Context.TraceId.ToString())); } var timestamp = TimeUtils.ToTimestamp(activity.StartTimeUtc); if (!CreateTransactionForActivity(activity, timestamp, spanLinks)) CreateSpanForActivity(activity, timestamp, spanLinks); }; private bool CreateTransactionForActivity(Activity activity, long timestamp, List<SpanLink> spanLinks) { Transaction transaction = null; if (activity.ParentId != null && _tracer.CurrentTransaction == null) { var dt = TraceContext.TryExtractTracingData(activity.ParentId, activity.Context.TraceState); transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown", timestamp, true, activity.SpanId.ToString(), distributedTracingData: dt, links: spanLinks, current: activity); } else if (activity.ParentId == null) { transaction = _tracer.StartTransactionInternal(activity.DisplayName, "unknown", timestamp, true, activity.SpanId.ToString(), activity.TraceId.ToString(), links: spanLinks, current: activity); } if (transaction == null) return false; transaction.Otel = new OTel { SpanKind = activity.Kind.ToString() }; if (activity.Id != null) _activeTransactions.AddOrUpdate(activity, transaction); return true; } private void CreateSpanForActivity(Activity activity, long timestamp, List<SpanLink> spanLinks) { Span newSpan; if (_tracer.CurrentSpan == null) { newSpan = (_tracer.CurrentTransaction as Transaction)?.StartSpanInternal(activity.DisplayName, "unknown", timestamp: timestamp, id: activity.SpanId.ToString(), links: spanLinks, current: activity); } else { newSpan = (_tracer.CurrentSpan as Span)?.StartSpanInternal(activity.DisplayName, "unknown", timestamp: timestamp, id: activity.SpanId.ToString(), links: spanLinks, current: activity); } if (newSpan == null) return; newSpan.Otel = new OTel { SpanKind = activity.Kind.ToString() }; if (activity.Kind == ActivityKind.Internal) { newSpan.Type = "app"; newSpan.Subtype = "internal"; } if (activity.Id != null) _activeSpans.AddOrUpdate(activity, newSpan); } private Action<Activity> ActivityStopped => activity => { if (activity == null) { _logger?.Trace()?.Log("ActivityStopped called with `null` activity. Ignoring `null` activity."); return; } activity.Stop(); _logger?.Trace()?.Log("ActivityStopped: name:{DisplayName} id:{ActivityId} traceId:{TraceId}", activity.DisplayName, activity.Id, activity.TraceId); if (KnownListeners.SkippedActivityNamesSet.Contains(activity.DisplayName)) return; if (activity.Id == null) return; if (_activeTransactions.TryGetValue(activity, out var transaction)) { _activeTransactions.Remove(activity); transaction.Duration = activity.Duration.TotalMilliseconds; UpdateOTelAttributes(activity, transaction.Otel); InferTransactionType(transaction, activity); // By default we set unknown outcome transaction.Outcome = Outcome.Unknown; #if NET8_0_OR_GREATER switch (activity.Status) { case ActivityStatusCode.Unset: transaction.Outcome = Outcome.Unknown; break; case ActivityStatusCode.Ok: transaction.Outcome = Outcome.Success; break; case ActivityStatusCode.Error: transaction.Outcome = Outcome.Failure; break; } #endif transaction.End(); } else if (_activeSpans.TryGetValue(activity, out var span)) { _activeSpans.Remove(activity); UpdateSpan(activity, span); } }; private static void UpdateOTelAttributes(Activity activity, OTel otel) { if (!activity.TagObjects.Any()) return; // https://opentelemetry.io/docs/specs/otel/common/#attribute-limits // copy max 128 keys and truncate values to 10k chars (the current maximum for e.g. statement.db). var i = 0; otel.Attributes ??= new Dictionary<string, object>(); foreach (var (key, value) in activity.TagObjects) { if (i >= 128) break; if (value is string s) otel.Attributes[key] = s.Truncate(10_000); else otel.Attributes[key] = value; i++; } } private static void UpdateSpan(Activity activity, Span span) { span.Duration = activity.Duration.TotalMilliseconds; UpdateOTelAttributes(activity, span.Otel); InferSpanTypeAndSubType(span, activity); // By default we set unknown outcome span.Outcome = Outcome.Unknown; #if NET8_0_OR_GREATER switch (activity.Status) { case ActivityStatusCode.Unset: span.Outcome = Outcome.Unknown; break; case ActivityStatusCode.Ok: span.Outcome = Outcome.Success; break; case ActivityStatusCode.Error: span.Outcome = Outcome.Failure; break; } #endif span.End(); } /// <summary> /// Specifically exposed for benchmarking. This is not intended for any other purpose. /// </summary> internal static void UpdateSpanBenchmark(Activity activity, Span span) => UpdateSpan(activity, span); private static void InferTransactionType(Transaction transaction, Activity activity) { if (activity.Kind == ActivityKind.Server && (TryGetStringValue(activity, SemanticConventions.RpcSystem, out _) || TryGetStringValue(activity, HttpAttributeKeys, out _))) transaction.Type = ApiConstants.TypeRequest; else if (activity.Kind == ActivityKind.Consumer && TryGetStringValue(activity, SemanticConventions.MessagingSystem, out _)) transaction.Type = ApiConstants.TypeMessaging; else transaction.Type = "unknown"; } private static void InferSpanTypeAndSubType(Span span, Activity activity) { static string HttpPortFromScheme(string scheme) { return scheme switch { "http" => "80", "https" => "443", _ => string.Empty }; } // extracts 'host' or 'host:port' from URL static string ParseNetName(string url) { try { var u = new Uri(url); // https://developer.mozilla.org/en-US/docs/Web/API/URL return u.Host + ':' + u.Port; } catch { return string.Empty; } } static string ToResourceName(string type, string name) { return string.IsNullOrEmpty(name) ? type : $"{type}/{name}"; } var peerPort = string.Empty; var netName = string.Empty; if (TryGetStringValue(activity, ServerPortAttributeKeys, out var netPortValue)) peerPort = netPortValue; if (TryGetStringValue(activity, ServerAddressAttributeKeys, out var netNameValue)) netName = netNameValue; if (netName.Length > 0 && peerPort.Length > 0) { netName += ':'; netName += peerPort; } string serviceTargetType = null; string serviceTargetName = null; string resource = null; if (TryGetStringValue(activity, SemanticConventions.DbSystem, out var dbSystem)) { span.Type = ApiConstants.TypeDb; span.Subtype = dbSystem; serviceTargetType = span.Subtype; serviceTargetName = TryGetStringValue(activity, SemanticConventions.DbName, out var dbName) ? dbName : null; resource = ToResourceName(span.Subtype, serviceTargetName); } else if (TryGetStringValue(activity, SemanticConventions.MessagingSystem, out var messagingSystem)) { span.Type = ApiConstants.TypeMessaging; span.Subtype = messagingSystem; serviceTargetType = span.Subtype; serviceTargetName = TryGetStringValue(activity, SemanticConventions.MessagingDestination, out var messagingDestination) ? messagingDestination : null; resource = ToResourceName(span.Subtype, serviceTargetName); } else if (TryGetStringValue(activity, SemanticConventions.RpcSystem, out var rpcSystem)) { span.Type = ApiConstants.TypeExternal; span.Subtype = rpcSystem; serviceTargetType = span.Subtype; serviceTargetName = !string.IsNullOrEmpty(netName) ? netName : TryGetStringValue(activity, SemanticConventions.RpcService, out var rpcService) ? rpcService : null; resource = serviceTargetName ?? span.Subtype; } else if (activity.TagObjects.Any(n => n.Key == SemanticConventions.HttpUrl || n.Key == SemanticConventions.UrlFull || n.Key == SemanticConventions.HttpScheme)) { var hasHttpHost = TryGetStringValue(activity, SemanticConventions.HttpHost, out var httpHost); var hasHttpScheme = TryGetStringValue(activity, SemanticConventions.HttpScheme, out var httpScheme); span.Type = ApiConstants.TypeExternal; span.Subtype = ApiConstants.SubtypeHttp; serviceTargetType = span.Subtype; if (hasHttpHost && hasHttpScheme) serviceTargetName = $"{httpHost}:{HttpPortFromScheme(httpScheme)}"; else if (TryGetStringValue(activity, HttpUrlAttributeKeys, out var httpUrl)) serviceTargetName = ParseNetName(httpUrl); else serviceTargetName = netName; resource = serviceTargetName; } if (serviceTargetType == null) { if (activity.Kind == ActivityKind.Internal) { span.Type = ApiConstants.TypeApp; span.Subtype = ApiConstants.SubTypeInternal; } else span.Type = ApiConstants.TypeUnknown; } span.Context.Service = new SpanService(new Target(serviceTargetType, serviceTargetName)); if (resource != null) { span.Context.Destination ??= new Destination(); span.Context.Destination.Service = new Destination.DestinationService { Resource = resource }; } } private static bool TryGetStringValue(Activity activity, string key, out string value) { value = null; var attribute = activity.GetTagItem(key); if (attribute is string stringValue) { value = stringValue; return true; } if (attribute is int intValue) { value = intValue.ToString(); return true; } return false; } private static bool TryGetStringValue(Activity activity, string[] keys, out string value) { value = null; foreach (var key in keys) { if (TryGetStringValue(activity, key, out var attributeValue)) { value = attributeValue; return true; } } return false; } protected virtual void Dispose(bool disposing) { if (!_disposed) { if (disposing) Listener?.Dispose(); _disposed = true; } } public void Dispose() { Dispose(disposing: true); GC.SuppressFinalize(this); } } } #endif