src/azure/Elastic.Apm.Azure.ServiceBus/AzureMessagingServiceBusDiagnosticListener.cs (393 lines of code) (raw):

// Licensed to Elasticsearch B.V under // one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reflection; using Elastic.Apm.Api; using Elastic.Apm.DiagnosticListeners; using Elastic.Apm.Helpers; using Elastic.Apm.Logging; using Elastic.Apm.Model; using Queue = Elastic.Apm.Api.Queue; namespace Elastic.Apm.Azure.ServiceBus { /// <summary> /// Creates spans for diagnostic events from Azure.Messaging.ServiceBus /// </summary> internal class AzureMessagingServiceBusDiagnosticListener : DiagnosticListenerBase { private readonly ApmAgent _realAgent; private readonly ConcurrentDictionary<string, IExecutionSegment> _processingSegments = new ConcurrentDictionary<string, IExecutionSegment>(); private readonly Framework _framework; public override string Name { get; } = "Azure.Messaging.ServiceBus"; public AzureMessagingServiceBusDiagnosticListener(IApmAgent agent) : base(agent) { _realAgent = agent as ApmAgent; _framework = new Framework { Name = ServiceBus.SegmentName }; } protected override void HandleOnNext(KeyValuePair<string, object> kv) { Logger.Trace()?.Log("Called with key: `{DiagnosticEventKey}'", kv.Key); if (string.IsNullOrEmpty(kv.Key)) { Logger.Trace()?.Log($"Key is {(kv.Key == null ? "null" : "an empty string")} - exiting"); return; } switch (kv.Key) { case "Message.Start": OnMessageStart(kv, "PREPARE"); break; case "Message.Stop": OnMessageStop(); break; case "ServiceBusSender.Send.Start": OnSendStart(kv, "SEND"); break; case "ServiceBusSender.Schedule.Start": OnSendStart(kv, "SCHEDULE"); break; case "ServiceBusReceiver.Receive.Start": OnReceiveStart(kv, "RECEIVE"); break; case "ServiceBusReceiver.ReceiveDeferred.Start": OnReceiveStart(kv, "RECEIVEDEFERRED"); break; case "ServiceBusProcessor.ProcessMessage.Start": case "ServiceBusSessionProcessor.ProcessSessionMessage.Start": OnProcessStart(kv, "PROCESS"); break; case "ServiceBusSender.Send.Stop": case "ServiceBusSender.Schedule.Stop": case "ServiceBusReceiver.Receive.Stop": case "ServiceBusReceiver.ReceiveDeferred.Stop": case "ServiceBusProcessor.ProcessMessage.Stop": case "ServiceBusSessionProcessor.ProcessSessionMessage.Stop": OnStop(); break; case "ServiceBusSender.Send.Exception": case "ServiceBusSender.Schedule.Exception": case "ServiceBusReceiver.Receive.Exception": case "ServiceBusReceiver.ReceiveDeferred.Exception": case "ServiceBusProcessor.ProcessMessage.Exception": case "ServiceBusSessionProcessor.ProcessSessionMessage.Exception": OnException(kv); break; default: Logger.Trace()?.Log("`{DiagnosticEventKey}' key is not a traced diagnostic event", kv.Key); break; } } private IExecutionSegment _onMessageCurrent; private void OnMessageStart(KeyValuePair<string, object> kv, string action) { var currentSegment = ApmAgent.GetCurrentExecutionSegment(); if (currentSegment is null) return; if (kv.Value is not Activity activity) { Logger.Trace()?.Log("Value is not an activity - exiting"); return; } string queueName = null; foreach (var tag in activity.Tags) { switch (tag.Key) { case "message_bus.destination": queueName = tag.Value; break; default: continue; } } if (MatchesIgnoreMessageQueues(queueName)) return; var name = $"{ServiceBus.SegmentName} {action} message"; _onMessageCurrent = currentSegment switch { // NOTE: We explicity set the SpanId here to match the Activity (value of the payload) to ensure that span linking on the // receiver works as expected. The Azure SDK attaches the diagnostic-id and traceparent to the message automatically. // On the receiving end, we need to be able to correctly link the consuming span to the producer. Span span => span.StartSpanInternal(name, ApiConstants.TypeMessaging, ServiceBus.SubType, action.ToLowerInvariant(), id: activity.SpanId.ToString()), Transaction transaction => transaction.StartSpanInternal(name, ApiConstants.TypeMessaging, ServiceBus.SubType, action.ToLowerInvariant(), id: activity.SpanId.ToString()), _ => _onMessageCurrent }; } private void OnMessageStop() => _onMessageCurrent?.End(); private void OnProcessStart(KeyValuePair<string, object> kv, string action) { if (kv.Value is not Activity activity) { Logger.Trace()?.Log("Value is not an activity - exiting"); return; } string queueName = null; foreach (var tag in activity.Tags) { switch (tag.Key) { case "message_bus.destination": queueName = tag.Value; break; default: continue; } } if (MatchesIgnoreMessageQueues(queueName)) return; var transactionName = queueName is null ? $"{ServiceBus.SegmentName} {action}" : $"{ServiceBus.SegmentName} {action} from {queueName}"; var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging); transaction.Context.Service = new Service(null, null) { Framework = _framework }; if (queueName != null) transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } }; // transaction creation will create an activity, so use this as the key. if (Activity.Current != null) { var activityId = Activity.Current.Id; if (!_processingSegments.TryAdd(activityId, transaction)) { Logger.Trace() ?.Log( "Could not add {Action} transaction {TransactionId} for activity {ActivityId} to tracked segments", action, transaction.Id, activityId); } } } private void OnReceiveStart(KeyValuePair<string, object> kv, string action) { if (kv.Value is not Activity activity) { Logger.Trace()?.Log("Value is not an activity - exiting"); return; } string queueName = null; foreach (var tag in activity.Tags) { switch (tag.Key) { case "message_bus.destination": queueName = tag.Value; break; default: continue; } } if (MatchesIgnoreMessageQueues(queueName)) return; var transactionName = queueName is null ? $"{ServiceBus.SegmentName} {action}" : $"{ServiceBus.SegmentName} {action} from {queueName}"; IExecutionSegment segment; if (ApmAgent.Tracer.CurrentTransaction is null) { var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging); transaction.Context.Service = new Service(null, null) { Framework = _framework }; if (queueName != null) transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } }; segment = transaction; } else { var span = ApmAgent.GetCurrentExecutionSegment() .StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action, isExitSpan: true); SetMessageAndServiceTarget(queueName, span); segment = span; } // transaction creation will create an activity, so use this as the key. if (Activity.Current != null) { var activityId = Activity.Current.Id; if (!_processingSegments.TryAdd(activityId, segment)) { Logger.Trace() ?.Log( "Could not add {Action} {SegmentName} {TransactionId} for activity {ActivityId} to tracked segments", action, segment is ITransaction ? "transaction" : "span", segment.Id, activityId); } } } private bool MatchesIgnoreMessageQueues(string name) { if (name != null && _realAgent != null) { var matcher = WildcardMatcher.AnyMatch(_realAgent.ConfigurationStore.CurrentSnapshot.IgnoreMessageQueues, name); if (matcher != null) { Logger.Debug() ?.Log( "Not tracing message from {QueueName} because it matched IgnoreMessageQueues pattern {Matcher}", name, matcher.GetMatcher()); return true; } } return false; } private void OnSendStart(KeyValuePair<string, object> kv, string action) { var currentSegment = ApmAgent.GetCurrentExecutionSegment(); if (currentSegment is null) { Logger.Trace()?.Log("No current transaction or span - exiting"); return; } if (kv.Value is not Activity activity) { Logger.Trace()?.Log("Value is not an activity - exiting"); return; } string queueName = null; string destinationAddress = null; foreach (var tag in activity.Tags) { switch (tag.Key) { case "message_bus.destination": queueName = tag.Value; break; case "peer.address": destinationAddress = tag.Value; break; default: continue; } } if (MatchesIgnoreMessageQueues(queueName)) return; var spanName = queueName is null ? $"{ServiceBus.SegmentName} {action}" : $"{ServiceBus.SegmentName} {action} to {queueName}"; var span = currentSegment.StartSpan(spanName, ApiConstants.TypeMessaging, ServiceBus.SubType, action.ToLowerInvariant()); span.Context.Destination = new Destination { Address = destinationAddress, Service = new Destination.DestinationService { Resource = queueName is null ? ServiceBus.SubType : $"{ServiceBus.SubType}/{queueName}" } }; SetMessageAndServiceTarget(queueName, span); if (!_processingSegments.TryAdd(activity.Id, span)) { Logger.Trace() ?.Log( "Could not add {Action} span {SpanId} for activity {ActivityId} to tracked spans", action, span.Id, activity.Id); } } private void OnStop() { var activity = Activity.Current; if (activity is null) { Logger.Trace()?.Log("Current activity is null - exiting"); return; } PopulateLinks(); if (!_processingSegments.TryRemove(activity.Id, out var segment)) { Logger.Trace() ?.Log( "Could not find segment for activity {ActivityId} in tracked segments", activity.Id); return; } segment.Outcome = Outcome.Success; segment.End(); } private void OnException(KeyValuePair<string, object> kv) { var activity = Activity.Current; if (activity is null) { Logger.Trace()?.Log("Current activity is null - exiting"); return; } if (!_processingSegments.TryRemove(activity.Id, out var segment)) { Logger.Trace() ?.Log( "Could not find segment for activity {ActivityId} in tracked segments", activity.Id); return; } PopulateLinks(); if (kv.Value is Exception e) segment.CaptureException(e); segment.Outcome = Outcome.Failure; segment.End(); } // ServiceBusReceiver isn't directly referenced - disable warning for it #pragma warning disable CS1574 /// <summary> /// <see cref="ServiceBusReceiver"/> creates activity links based on the `Diagnostic-Id` property of the message. /// This property is written based the activity on the producer side. /// When reading a batch of messages we create span links for each parent. /// We walk up the activity chain, look for the ServiceBusReceiver activity and use these activity links to create span links. /// </summary> #pragma warning restore CS1574 private void PopulateLinks() { var current = Activity.Current; while (current != null && !current.DisplayName.Contains("ServiceBusReceiver")) { if (!current.DisplayName.StartsWith("ServiceBusReceiver")) current = current.Parent; } if (current == null) return; // The type of Activity.Links got change across versions. // If different the compiled version is different from the runtime version, we can't use the .Links property // Therefore we fetch the value via reflection var linksProperties = current.GetType().GetProperties().Where(n => n.Name == "Links"); var propertyInfos = linksProperties as PropertyInfo[] ?? linksProperties.ToArray(); if (!propertyInfos.Any()) return; // The type is either `Activity` or `ActivityLink`. foreach (var prop in propertyInfos) { if (prop.GetValue(current) is IEnumerable<Activity> activityList && activityList.Any()) CollectSpanLinks(activityList); else { if (prop.GetValue(current) is IEnumerable<ActivityLink> activityLinkList && activityLinkList.Any()) CollectSpanLinks(activityLinkList); else return; } } void CollectSpanLinks(object links) { if (Activity.Current?.Id != null && _processingSegments.TryGetValue(Activity.Current.Id, out var segment)) { var spanLinks = new List<SpanLink>(); if (links is not IEnumerable iEnumerable) return; foreach (var link in iEnumerable) { // According to spec we only create up to 1k links. if (spanLinks.Count == 1000) break; if (link is Activity activity) { spanLinks.Add(new SpanLink(activity.ParentSpanId.ToString(), activity.TraceId.ToString())); } else if (link is ActivityLink activityLink) { spanLinks.Add(new SpanLink(activityLink.Context.SpanId.ToString(), activityLink.Context.TraceId.ToString())); } } if (segment is Transaction realTransaction) realTransaction.InsertSpanLinkInternal(spanLinks); else if (segment is Span realSpan) realSpan.InsertSpanLinkInternal(spanLinks); } } } private static void SetMessageAndServiceTarget(string queueName, ISpan span) { if (queueName is not null) { span.Context.Message = new Message { Queue = new Queue { Name = queueName } }; } // queueName may be null here which is fine span.Context.Service = new SpanService(new Target(ServiceBus.SubType, queueName)); } } }