edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/routing/CloudEndpoint.cs (333 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing { using System; using System.Collections.Generic; using System.Collections.Immutable; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using App.Metrics; using App.Metrics.Counter; using App.Metrics.Timer; using Microsoft.Azure.Devices.Client.Exceptions; using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud; using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling; using Microsoft.Azure.Devices.Routing.Core; using Microsoft.Azure.Devices.Routing.Core.Util; using Microsoft.Extensions.Logging; using static System.FormattableString; using Constants = Microsoft.Azure.Devices.Edge.Hub.Core.Constants; using IMessage = Microsoft.Azure.Devices.Edge.Hub.Core.IMessage; using IRoutingMessage = Microsoft.Azure.Devices.Routing.Core.IMessage; using ISinkResult = Microsoft.Azure.Devices.Routing.Core.ISinkResult<Microsoft.Azure.Devices.Routing.Core.IMessage>; using SystemProperties = Microsoft.Azure.Devices.Edge.Hub.Core.SystemProperties; public class CloudEndpoint : Endpoint { readonly Func<string, Task<Try<ICloudProxy>>> cloudProxyGetterFunc; readonly Core.IMessageConverter<IRoutingMessage> messageConverter; readonly int maxBatchSize; readonly bool trackDeviceState; public CloudEndpoint( string id, Func<string, Task<Try<ICloudProxy>>> cloudProxyGetterFunc, Core.IMessageConverter<IRoutingMessage> messageConverter, bool trackDeviceState, int maxBatchSize = 10, int fanoutFactor = 10) : base(id) { Preconditions.CheckArgument(maxBatchSize > 0, "MaxBatchSize should be greater than 0"); this.cloudProxyGetterFunc = Preconditions.CheckNotNull(cloudProxyGetterFunc); this.messageConverter = Preconditions.CheckNotNull(messageConverter); this.trackDeviceState = trackDeviceState; this.maxBatchSize = maxBatchSize; this.FanOutFactor = fanoutFactor; Events.Created(id, maxBatchSize, fanoutFactor); } public override string Type => this.GetType().Name; public override int FanOutFactor { get; } public override IProcessor CreateProcessor() => new CloudMessageProcessor(this, this.trackDeviceState); public override void LogUserMetrics(long messageCount, long latencyInMs) { // TODO - No-op for now } internal class CloudMessageProcessor : IProcessor { readonly ISet<Type> retryableExceptions = new HashSet<Type> { typeof(TimeoutException), typeof(IOException), typeof(IotHubException), typeof(EdgeHubCloudSDKException), typeof(UnauthorizedException) // This indicates the SAS token has expired, and will get a new one. }; readonly CloudEndpoint cloudEndpoint; readonly bool trackDeviceState; public CloudMessageProcessor(CloudEndpoint endpoint, bool trackDeviceState) { this.cloudEndpoint = Preconditions.CheckNotNull(endpoint); this.trackDeviceState = trackDeviceState; if (!trackDeviceState) { this.retryableExceptions.Add(typeof(DeviceInvalidStateException)); } } public Endpoint Endpoint => this.cloudEndpoint; public ITransientErrorDetectionStrategy ErrorDetectionStrategy => new ErrorDetectionStrategy(this.IsTransientException); public async Task<ISinkResult> ProcessAsync(IRoutingMessage routingMessage, CancellationToken token) { Preconditions.CheckNotNull(routingMessage, nameof(routingMessage)); string id = this.GetIdentity(routingMessage); ISinkResult result = await this.ProcessClientMessagesBatch(id, new List<IRoutingMessage> { routingMessage }, token); Events.DoneProcessing(token); return result; } public Task<ISinkResult> ProcessAsync(ICollection<IRoutingMessage> routingMessages, CancellationToken token) { Events.ProcessingMessages(Preconditions.CheckNotNull(routingMessages, nameof(routingMessages))); Task<ISinkResult> syncResult = this.ProcessByClients(routingMessages, token); Events.DoneProcessing(token); return syncResult; } public Task CloseAsync(CancellationToken token) => Task.CompletedTask; internal static int GetBatchSize(int batchSize, long messageSize) => Math.Min((int)(Constants.MaxMessageSize / Math.Max(1, messageSize)), batchSize); bool IsRetryable(Exception ex) => ex != null && this.retryableExceptions.Any(re => re.IsInstanceOfType(ex)); static ISinkResult HandleNoIdentity(List<IRoutingMessage> routingMessages) { Events.InvalidMessageNoIdentity(); return GetSyncResultForInvalidMessages(new InvalidOperationException("Message does not contain device id"), routingMessages); } static ISinkResult HandleNoConnection(string identity, List<IRoutingMessage> routingMessages) { Events.IoTHubNotConnected(identity); return GetSyncResultForFailedMessages(new EdgeHubConnectionException($"Could not get connection to IoT Hub for {identity}"), routingMessages); } static ISinkResult HandleCancelled(List<IRoutingMessage> routingMessages) => GetSyncResultForFailedMessages(new EdgeHubConnectionException($"Cancelled sending messages to IotHub"), routingMessages); static ISinkResult GetSyncResultForFailedMessages(Exception ex, List<IRoutingMessage> routingMessages) { var sendFailureDetails = new SendFailureDetails(FailureKind.Transient, ex); return new SinkResult<IRoutingMessage>(ImmutableList<IRoutingMessage>.Empty, routingMessages, sendFailureDetails); } static ISinkResult GetSyncResultForInvalidMessages(Exception ex, List<IRoutingMessage> routingMessages) { List<InvalidDetails<IRoutingMessage>> invalid = routingMessages .Select(m => new InvalidDetails<IRoutingMessage>(m, FailureKind.InvalidInput)) .ToList(); var sendFailureDetails = new SendFailureDetails(FailureKind.InvalidInput, ex); return new SinkResult<IRoutingMessage>(ImmutableList<IRoutingMessage>.Empty, ImmutableList<IRoutingMessage>.Empty, invalid, sendFailureDetails); } async Task<ISinkResult> ProcessByClients(ICollection<IRoutingMessage> routingMessages, CancellationToken token) { var result = new MergingSinkResult<IRoutingMessage>(); var routingMessageGroups = (from r in routingMessages group r by this.GetIdentity(r) into g select new { Id = g.Key, RoutingMessages = g.ToList() }) .ToList(); Events.ProcessingMessageGroups(routingMessages, routingMessageGroups.Count, this.cloudEndpoint.FanOutFactor); foreach (var groupBatch in routingMessageGroups.Batch(this.cloudEndpoint.FanOutFactor)) { IEnumerable<Task<ISinkResult<IRoutingMessage>>> sendTasks = groupBatch .Select(item => this.ProcessClientMessages(item.Id, item.RoutingMessages, token)); ISinkResult<IRoutingMessage>[] sinkResults = await Task.WhenAll(sendTasks); foreach (var res in sinkResults) { result.Merge(res); } } return result; } // Process all messages for a particular client async Task<ISinkResult<IRoutingMessage>> ProcessClientMessages(string id, List<IRoutingMessage> routingMessages, CancellationToken token) { var result = new MergingSinkResult<IRoutingMessage>(); // Find the maximum message size, and divide messages into largest batches // not exceeding max allowed IoTHub message size. long maxMessageSize = routingMessages.Select(r => r.Size()).Max(); int batchSize = GetBatchSize(Math.Min(this.cloudEndpoint.maxBatchSize, routingMessages.Count), maxMessageSize); var iterator = routingMessages.Batch(batchSize).GetEnumerator(); while (iterator.MoveNext()) { result.Merge(await this.ProcessClientMessagesBatch(id, iterator.Current.ToList(), token)); if (!result.IsSuccessful) break; } // if failed earlier, fast-fail the rest while (iterator.MoveNext()) { result.AddFailed(iterator.Current); } return result; } async Task<ISinkResult<IRoutingMessage>> ProcessClientMessagesBatch(string id, List<IRoutingMessage> routingMessages, CancellationToken token) { if (string.IsNullOrEmpty(id)) { return HandleNoIdentity(routingMessages); } if (token.IsCancellationRequested) { return HandleCancelled(routingMessages); } Try<ICloudProxy> cloudProxy = await this.cloudEndpoint.cloudProxyGetterFunc(id); if (cloudProxy.Success) { var cp = cloudProxy.Value; try { List<IMessage> messages = routingMessages .Select(r => this.cloudEndpoint.messageConverter.ToMessage(r)) .ToList(); if (messages.Count == 1) { await cp.SendMessageAsync(messages[0]); } else { await cp.SendMessageBatchAsync(messages); } return new SinkResult<IRoutingMessage>(routingMessages); } catch (Exception ex) { return this.HandleException(ex, id, routingMessages); } } else { if (this.IsRetryable(cloudProxy.Exception) || !this.trackDeviceState) { return HandleNoConnection(id, routingMessages); } else { return this.HandleException(cloudProxy.Exception, id, routingMessages); } } } ISinkResult HandleException(Exception ex, string id, List<IRoutingMessage> routingMessages) { if (this.IsRetryable(ex)) { Events.RetryingMessage(id, ex); return GetSyncResultForFailedMessages(new EdgeHubIOException($"Error sending messages to IotHub for device {this.cloudEndpoint.Id}"), routingMessages); } else { Events.InvalidMessage(id, ex); return GetSyncResultForInvalidMessages(ex, routingMessages); } } bool IsTransientException(Exception ex) => ex is EdgeHubIOException || ex is EdgeHubConnectionException; string GetIdentity(IRoutingMessage routingMessage) { if (routingMessage.SystemProperties.TryGetValue(SystemProperties.ConnectionDeviceId, out string deviceId)) { return routingMessage.SystemProperties.TryGetValue(SystemProperties.ConnectionModuleId, out string moduleId) ? $"{deviceId}/{moduleId}" : deviceId; } Events.DeviceIdNotFound(routingMessage); return string.Empty; } } static class Events { const int IdStart = HubCoreEventIds.CloudEndpoint; static readonly ILogger Log = Logger.Factory.CreateLogger<CloudEndpoint>(); enum EventIds { DeviceIdNotFound = IdStart, IoTHubNotConnected, RetryingMessages, InvalidMessage, ProcessingMessages, InvalidMessageNoIdentity, CancelledProcessing, Created, DoneProcessing } public static void DeviceIdNotFound(IRoutingMessage routingMessage) { string message = routingMessage.SystemProperties.TryGetValue(SystemProperties.MessageId, out string messageId) ? Invariant($"Message with MessageId {messageId} does not contain a device Id.") : "Received message does not contain a device Id"; Log.LogWarning((int)EventIds.DeviceIdNotFound, message); } public static void ProcessingMessages(ICollection<IRoutingMessage> routingMessages) { Log.LogDebug((int)EventIds.ProcessingMessages, Invariant($"Sending {routingMessages.Count} message(s) upstream.")); } public static void CancelledProcessingMessages(ICollection<IRoutingMessage> messages) { if (messages.Count > 0) { IRoutingMessage firstMessage = messages.OrderBy(m => m.Offset).First(); Log.LogDebug((int)EventIds.CancelledProcessing, $"Cancelled sending messages from offset {firstMessage.Offset}"); } else { Log.LogDebug((int)EventIds.CancelledProcessing, "Cancelled sending messages"); } } public static void CancelledProcessingMessage(IRoutingMessage message) { Log.LogDebug((int)EventIds.CancelledProcessing, $"Cancelled sending messages from offset {message.Offset}"); } public static void InvalidMessageNoIdentity() { Log.LogWarning((int)EventIds.InvalidMessageNoIdentity, "Cannot process message with no identity, discarding it."); } public static void ProcessingMessageGroups(ICollection<IRoutingMessage> routingMessages, int groups, int fanoutFactor) { Log.LogDebug((int)EventIds.ProcessingMessages, Invariant($"Sending {routingMessages.Count} message(s) upstream, divided into {groups} groups. Processing maximum {fanoutFactor} groups in parallel.")); } public static void Created(string id, int maxbatchSize, int fanoutFactor) { Log.LogInformation((int)EventIds.Created, Invariant($"Created cloud endpoint {id} with max batch size {maxbatchSize} and fan-out factor of {fanoutFactor}.")); } public static void DoneProcessing(CancellationToken token) { if (token.IsCancellationRequested) { Log.LogInformation((int)EventIds.CancelledProcessing, "Stopped sending messages to upstream as the operation was cancelled"); } else { Log.LogDebug((int)EventIds.DoneProcessing, "Finished processing messages to upstream"); } } internal static void IoTHubNotConnected(string id) { Log.LogWarning((int)EventIds.IoTHubNotConnected, Invariant($"Could not get an active Iot Hub connection for client {id}")); } internal static void RetryingMessage(string id, Exception ex) { Log.LogDebug((int)EventIds.RetryingMessages, Invariant($"Retrying sending message from {id} to Iot Hub due to exception {ex.GetType()}:{ex.Message}.")); } internal static void InvalidMessage(string id, Exception ex) { Log.LogWarning((int)EventIds.InvalidMessage, ex, Invariant($"Non retryable exception occurred while sending message for client {id}.")); } } static class MetricsV0 { static readonly CounterOptions EdgeHubToCloudMessageCountOptions = new CounterOptions { Name = "EdgeHubToCloudMessageSentCount", MeasurementUnit = Unit.Events, ResetOnReporting = true, }; static readonly TimerOptions EdgeHubToCloudMessageLatencyOptions = new TimerOptions { Name = "EdgeHubToCloudMessageLatencyMs", MeasurementUnit = Unit.None, DurationUnit = TimeUnit.Milliseconds, RateUnit = TimeUnit.Seconds }; static MetricTags GetTags(string id) { return new MetricTags("DeviceId", id); } } } }