smoke/IotEdgeQuickstart/details/Details.cs (84 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. namespace IotEdgeQuickstart.Details { using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Devices; using Microsoft.Azure.Devices.Common.Exceptions; using Microsoft.Azure.Devices.Edge.Test.Common; using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling; using Microsoft.Azure.Devices.Shared; using Microsoft.Azure.EventHubs; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using EventHubClientTransportType = Microsoft.Azure.EventHubs.TransportType; using RetryPolicy = Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling.RetryPolicy; using ServiceClientTransportType = Microsoft.Azure.Devices.TransportType; public class Details { public readonly Option<string> DeploymentFileName; public readonly Option<string> TwinTestFileName; const string DeployJson = @" { ""modulesContent"": { ""$edgeAgent"": { ""properties.desired"": { ""schemaVersion"": ""1.0"", ""runtime"": { ""type"": ""docker"", ""settings"": { ""minDockerVersion"": ""v1.25"", ""loggingOptions"": """"<registry-info> } }, ""systemModules"": { ""edgeAgent"": { ""type"": ""docker"", ""settings"": { ""image"": ""<image-edge-agent>"", ""createOptions"": """" } }, ""edgeHub"": { ""type"": ""docker"", ""status"": ""running"", ""restartPolicy"": ""always"", ""settings"": { ""image"": ""<image-edge-hub>"", ""createOptions"": ""{\""HostConfig\"":{\""PortBindings\"":{\""8883/tcp\"":[{\""HostPort\"":\""8883\""}],\""443/tcp\"":[{\""HostPort\"":\""443\""}],\""5671/tcp\"":[{\""HostPort\"":\""5671\""}]}}}"" }, ""env"": { ""OptimizeForPerformance"": { ""value"": ""<optimized-for-performance>"" }, ""NestedEdgeEnabled"": { ""value"": ""false"" } }, } }, ""modules"": { ""tempSensor"": { ""version"": ""1.0"", ""type"": ""docker"", ""status"": ""running"", ""restartPolicy"": ""always"", ""settings"": { ""image"": ""<image-temp-sensor>"", ""createOptions"": """" } } } } }, ""$edgeHub"": { ""properties.desired"": { ""schemaVersion"": ""1.0"", ""routes"": { ""route"": ""FROM /* INTO $upstream"" }, ""storeAndForwardConfiguration"": { ""timeToLiveSecs"": 7200 } } } } } "; const string DeployJsonRegistry = @" ,""registryCredentials"": { ""registry"": { ""address"": ""<registry-address>"", ""username"": ""<registry-username>"", ""password"": ""<registry-password>"" } } "; readonly IBootstrapper bootstrapper; readonly Option<RegistryCredentials> credentials; readonly string iothubConnectionString; readonly Option<DPSAttestation> dpsAttestation; readonly string eventhubCompatibleEndpointWithEntityPath; readonly ServiceClientTransportType serviceClientTransportType; readonly EventHubClientTransportType eventHubClientTransportType; readonly string imageTag; readonly string deviceId; readonly string hostname; readonly Option<string> parentHostname; readonly Option<string> parentEdgeDevice; readonly string deviceCaCert; readonly string deviceCaPk; readonly string deviceCaCerts; readonly bool optimizedForPerformance; readonly bool initializeWithAgentArtifact; readonly LogLevel runtimeLogLevel; readonly bool cleanUpExistingDeviceOnSuccess; DeviceContext context; Option<IWebProxy> proxy; protected Details( IBootstrapper bootstrapper, Option<RegistryCredentials> credentials, string iothubConnectionString, string eventhubCompatibleEndpointWithEntityPath, UpstreamProtocolType upstreamProtocol, Option<string> proxy, string imageTag, string deviceId, string hostname, Option<string> parentHostname, Option<string> parentEdgeDevice, Option<string> deploymentFileName, Option<string> twinTestFileName, string deviceCaCert, string deviceCaPk, string deviceCaCerts, bool optimizedForPerformance, bool initializeWithAgentArtifact, LogLevel runtimeLogLevel, bool cleanUpExistingDeviceOnSuccess, Option<DPSAttestation> dpsAttestation) { this.bootstrapper = bootstrapper; this.credentials = credentials; this.iothubConnectionString = iothubConnectionString; this.dpsAttestation = dpsAttestation; this.eventhubCompatibleEndpointWithEntityPath = eventhubCompatibleEndpointWithEntityPath; switch (upstreamProtocol) { case UpstreamProtocolType.Amqp: case UpstreamProtocolType.Mqtt: this.serviceClientTransportType = ServiceClientTransportType.Amqp; this.eventHubClientTransportType = EventHubClientTransportType.Amqp; break; case UpstreamProtocolType.AmqpWs: case UpstreamProtocolType.MqttWs: this.serviceClientTransportType = ServiceClientTransportType.Amqp_WebSocket_Only; this.eventHubClientTransportType = EventHubClientTransportType.AmqpWebSockets; break; default: throw new Exception($"Unexpected upstream protocol type {upstreamProtocol}"); } this.imageTag = imageTag; this.deviceId = deviceId; this.hostname = hostname; this.parentHostname = parentHostname; this.parentEdgeDevice = parentEdgeDevice; this.DeploymentFileName = deploymentFileName; this.TwinTestFileName = twinTestFileName; this.deviceCaCert = deviceCaCert; this.deviceCaPk = deviceCaPk; this.deviceCaCerts = deviceCaCerts; this.optimizedForPerformance = optimizedForPerformance; this.initializeWithAgentArtifact = initializeWithAgentArtifact; this.runtimeLogLevel = runtimeLogLevel; this.cleanUpExistingDeviceOnSuccess = cleanUpExistingDeviceOnSuccess; this.proxy = proxy.Map(p => new WebProxy(p) as IWebProxy); } protected Task UpdatePackageState() { Console.WriteLine("Checking if aziot-edge and aziot-identity-service are installed."); return this.bootstrapper.UpdatePackageState(); } protected Task VerifyBootstrapperDependencies() { Console.WriteLine("Verifying bootstrapper dependencies."); return this.bootstrapper.VerifyDependenciesAreInstalled(); } protected Task InstallBootstrapper() { Console.WriteLine("Installing bootstrapper."); return this.bootstrapper.Install(); } protected async Task GetOrCreateEdgeDeviceIdentity() { Console.WriteLine("Getting or Creating device Identity."); var settings = new HttpTransportSettings(); this.proxy.ForEach(p => settings.Proxy = p); IotHubConnectionStringBuilder builder = IotHubConnectionStringBuilder.Create(this.iothubConnectionString); RegistryManager rm = RegistryManager.CreateFromConnectionString(builder.ToString(), settings); Device device = await rm.GetDeviceAsync(this.deviceId); if (device != null) { Console.WriteLine($"Device '{device.Id}' already registered on IoT hub '{builder.HostName}'"); Console.WriteLine($"Clean up Existing device? {this.cleanUpExistingDeviceOnSuccess}"); this.context = new DeviceContext(device, this.iothubConnectionString, rm, this.cleanUpExistingDeviceOnSuccess); } else { // if dpsAttestion is enabled, do not create a device as the // ESD will register with DPS to create the device in IoT Hub if (this.dpsAttestation.HasValue) { this.context = new DeviceContext(this.deviceId, this.iothubConnectionString, rm, this.cleanUpExistingDeviceOnSuccess); } else { await this.CreateEdgeDeviceIdentity(rm); } } } protected Task ConfigureBootstrapper() { Console.WriteLine("Configuring bootstrapper."); DeviceProvisioningMethod method = this.dpsAttestation.Match( dps => { return new DeviceProvisioningMethod(dps); }, () => { IotHubConnectionStringBuilder builder = IotHubConnectionStringBuilder.Create(this.context.IotHubConnectionString); Device device = this.context.Device.Expect(() => new InvalidOperationException("Expected a valid device instance")); string connectionString = $"HostName={builder.HostName};" + $"DeviceId={device.Id};" + $"SharedAccessKey={device.Authentication.SymmetricKey.PrimaryKey}"; return new DeviceProvisioningMethod(connectionString); }); Option<string> agentImage = Option.None<string>(); if (this.initializeWithAgentArtifact) { agentImage = Option.Some<string>(this.EdgeAgentImage()); } return this.bootstrapper.Configure(method, agentImage, this.hostname, this.parentHostname, this.deviceCaCert, this.deviceCaPk, this.deviceCaCerts, this.runtimeLogLevel); } protected Task StartBootstrapper() { Console.WriteLine("Starting bootstrapper."); return this.bootstrapper.Start(); } protected Task VerifyEdgeAgentIsRunning() { Console.WriteLine("Verifying if edge Agent is running."); return this.bootstrapper.VerifyModuleIsRunning("edgeAgent"); } protected async Task VerifyEdgeAgentIsConnectedToIotHub() { Console.WriteLine("Verifying if edge is connected to IoT Hub."); using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(600))) // Long timeout is needed because registry manager takes a while for the device identity to be usable { Exception savedException = null; try { var settings = new ServiceClientTransportSettings(); this.proxy.ForEach(p => settings.HttpProxy = p); ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(this.context.IotHubConnectionString, this.serviceClientTransportType, settings); while (!cts.IsCancellationRequested) { await Task.Delay(TimeSpan.FromSeconds(10), cts.Token); try { CloudToDeviceMethodResult result = await serviceClient.InvokeDeviceMethodAsync( this.context.DeviceId, "$edgeAgent", new CloudToDeviceMethod("ping"), cts.Token); if (result.Status == 200) { break; } } catch (Exception e) { savedException = e; } } } catch (OperationCanceledException e) { throw new Exception($"Failed to ping $edgeAgent from the cloud: {savedException?.Message ?? e.Message}"); } catch (Exception e) { throw new Exception($"Failed to ping $edgeAgent from the cloud: {e.Message}"); } } } protected Task DeployToEdgeDevice() { Console.WriteLine("Deploying edge device."); (string deployJson, string[] modules) = this.DeploymentJson(); Console.WriteLine($"Sending configuration to device '{this.context.DeviceId}' with modules:"); foreach (string module in modules) { Console.WriteLine($" {module}"); } var config = JsonConvert.DeserializeObject<ConfigurationContent>(deployJson); var retryStrategy = new Incremental(15, RetryStrategy.DefaultRetryInterval, RetryStrategy.DefaultRetryIncrement); var retryPolicy = new RetryPolicy(new TransientNetworkErrorDetectionStrategy(), retryStrategy); return retryPolicy.ExecuteAsync( async () => { Console.WriteLine("Attempting to apply configuration on device..."); await this.context.RegistryManager.ApplyConfigurationContentOnDeviceAsync(this.context.DeviceId, config); }, new CancellationTokenSource(TimeSpan.FromMinutes(10)).Token); } protected async Task VerifyDataOnIoTHub(string moduleId) { Console.WriteLine($"Verifying data on IoTHub from {moduleId}"); // First Verify if module is already running. await this.bootstrapper.VerifyModuleIsRunning(moduleId); var builder = new EventHubsConnectionStringBuilder(this.eventhubCompatibleEndpointWithEntityPath) { TransportType = this.eventHubClientTransportType }; Console.WriteLine($"Receiving events from device '{this.context.DeviceId}' on Event Hub '{builder.EntityPath}'"); EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(builder.ToString()); this.proxy.ForEach(p => eventHubClient.WebProxy = p); PartitionReceiver eventHubReceiver = eventHubClient.CreateReceiver( "$Default", EventHubPartitionKeyResolver.ResolveToPartition( this.context.DeviceId, (await eventHubClient.GetRuntimeInformationAsync()).PartitionCount), EventPosition.FromEnd()); // TODO: [Improvement] should verify test results without using event hub, which introduce latency. var result = new TaskCompletionSource<bool>(); using (var cts = new CancellationTokenSource(TimeSpan.FromMinutes(20))) // This long timeout is needed in case event hub is slow to process messages { using (cts.Token.Register(() => result.TrySetCanceled())) { eventHubReceiver.SetReceiveHandler( new PartitionReceiveHandler( eventData => { eventData.SystemProperties.TryGetValue("iothub-connection-device-id", out object devId); eventData.SystemProperties.TryGetValue("iothub-connection-module-id", out object modId); if (devId != null && devId.ToString().Equals(this.context.DeviceId) && modId != null && modId.ToString().Equals(moduleId)) { result.TrySetResult(true); return true; } return false; })); await result.Task; } } Console.WriteLine("VerifyDataOnIoTHub completed."); await eventHubReceiver.CloseAsync(); await eventHubClient.CloseAsync(); } protected async Task VerifyTwinAsync() { await this.TwinTestFileName.ForEachAsync( async fileName => { Console.WriteLine($"VerifyTwinAsync for {fileName} started."); string twinTestJson = File.ReadAllText(fileName); var twinTest = JsonConvert.DeserializeObject<TwinTestConfiguration>(twinTestJson); Twin currentTwin = await this.context.RegistryManager.GetTwinAsync(this.context.DeviceId, twinTest.ModuleId); if (twinTest.Properties?.Desired?.Count > 0) { // Build Patch Object. string patch = JsonConvert.SerializeObject(twinTest, Formatting.Indented); await this.context.RegistryManager.UpdateTwinAsync(this.context.DeviceId, twinTest.ModuleId, patch, currentTwin.ETag); } if (twinTest.Properties?.Reported?.Count > 0) { TimeSpan retryInterval = TimeSpan.FromSeconds(10); bool IsValid(TwinCollection currentTwinReportedProperty) => twinTest.Properties.Reported.Cast<KeyValuePair<string, object>>().All(p => currentTwinReportedProperty.Cast<KeyValuePair<string, object>>().Contains(p)); using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20))) { async Task<TwinCollection> Func() { // Removing reSharper warning for CTS, Code Block will never exit before the delegate code completes because of using. // ReSharper disable AccessToDisposedClosure currentTwin = await this.context.RegistryManager.GetTwinAsync(this.context.DeviceId, twinTest.ModuleId, cts.Token); // ReSharper restore AccessToDisposedClosure return await Task.FromResult(currentTwin.Properties.Reported); } await Retry.Do(Func, IsValid, null, retryInterval, cts.Token); } } Console.WriteLine($"VerifyTwinAsync for {fileName} completed."); }); } protected Task RemoveTempSensorFromEdgeDevice() { Console.WriteLine("Removing tempSensor from edge device."); (string deployJson, string[] _) = this.DeploymentJson(); var config = JsonConvert.DeserializeObject<ConfigurationContent>(deployJson); JObject desired = JObject.FromObject(config.ModulesContent["$edgeAgent"]["properties.desired"]); if (desired.TryGetValue("modules", out JToken modules)) { IList<JToken> removeList = new List<JToken>(); foreach (JToken module in modules.Children()) { removeList.Add(module); } foreach (JToken module in removeList) { module.Remove(); } } config.ModulesContent["$edgeAgent"]["properties.desired"] = desired; return this.context.RegistryManager.ApplyConfigurationContentOnDeviceAsync(this.context.DeviceId, config); } protected Task StopBootstrapper() { Console.WriteLine("Stopping bootstrapper."); return this.bootstrapper.Stop(); } protected Task ResetBootstrapper() { Console.WriteLine("Resetting bootstrapper."); return this.bootstrapper.Reset(); } protected void KeepEdgeDeviceIdentity() { Console.WriteLine("Keeping Edge Device Identity."); if (this.context != null) { this.context.RemoveDevice = false; } } protected Task MaybeDeleteEdgeDeviceIdentity() { if (this.context != null) { return this.context.DeleteDevice(); } return Task.CompletedTask; } async Task CreateEdgeDeviceIdentity(RegistryManager rm) { var device = new Device(this.deviceId) { Authentication = new AuthenticationMechanism() { Type = AuthenticationType.Sas }, Capabilities = new DeviceCapabilities() { IotEdge = true } }; await this.parentEdgeDevice.ForEachAsync(async p => { var parentDevice = await rm.GetDeviceAsync(p); device.ParentScopes.Add(parentDevice.Scope); }); IotHubConnectionStringBuilder builder = IotHubConnectionStringBuilder.Create(this.iothubConnectionString); Console.WriteLine($"Registering device '{device.Id}' on IoT hub '{builder.HostName}'"); var retryStrategy = new Incremental(15, RetryStrategy.DefaultRetryInterval, RetryStrategy.DefaultRetryIncrement); var retryPolicy = new RetryPolicy(new TransientNetworkErrorDetectionStrategy(), retryStrategy); await retryPolicy.ExecuteAsync( async () => { Console.WriteLine("Attempting to create device identity..."); device = await rm.AddDeviceAsync(device); }, new CancellationTokenSource(TimeSpan.FromMinutes(10)).Token); this.context = new DeviceContext(device, builder.ToString(), rm, true); } string EdgeAgentImage() { return this.BuildImageName("azureiotedge-agent"); } string EdgeHubImage() { return this.BuildImageName("azureiotedge-hub"); } string TempSensorImage() { return this.BuildImageName("azureiotedge-simulated-temperature-sensor"); } string BuildImageName(string name) { string prefix = this.credentials.Match(c => $"{c.Address}/microsoft", () => "mcr.microsoft.com"); return $"{prefix}/{name}:{this.imageTag}"; } (string, string[]) DeploymentJson() { string edgeAgentImage = this.EdgeAgentImage(); string edgeHubImage = this.EdgeHubImage(); string tempSensorImage = this.TempSensorImage(); string deployJson = this.DeploymentFileName.Match( f => { Console.WriteLine($"Deployment file used: {f}"); return JObject.Parse(File.ReadAllText(f)).ToString(); }, () => { string deployJsonRegistry = this.credentials.Match( c => { string jsonRegistry = DeployJsonRegistry; jsonRegistry = Regex.Replace(jsonRegistry, "<registry-address>", c.Address); jsonRegistry = Regex.Replace(jsonRegistry, "<registry-username>", c.User); jsonRegistry = Regex.Replace(jsonRegistry, "<registry-password>", c.Password); return jsonRegistry; }, () => string.Empty); string json = DeployJson; json = Regex.Replace(json, "<image-edge-agent>", edgeAgentImage); json = Regex.Replace(json, "<image-edge-hub>", edgeHubImage); json = Regex.Replace(json, "<image-temp-sensor>", tempSensorImage); json = Regex.Replace(json, "<registry-info>", deployJsonRegistry); json = Regex.Replace(json, "<optimized-for-performance>", this.optimizedForPerformance.ToString()); return json; }); return (deployJson, new[] { edgeAgentImage, edgeHubImage, tempSensorImage }); } } class TransientNetworkErrorDetectionStrategy : ITransientErrorDetectionStrategy { public bool IsTransient(Exception ex) { return ex is IotHubCommunicationException && ex.Message.Contains("The POST operation timed out"); } } public class DeviceContext { public DeviceContext(string deviceId, string iothubConnectionString, RegistryManager rm, bool removeDevice) { this.DeviceId = deviceId; this.Device = Option.None<Device>(); this.IotHubConnectionString = iothubConnectionString; this.RegistryManager = rm; this.RemoveDevice = removeDevice; } public DeviceContext(Device device, string iothubConnectionString, RegistryManager rm, bool removeDevice) { this.DeviceId = device.Id; this.Device = Option.Some(device); this.IotHubConnectionString = iothubConnectionString; this.RegistryManager = rm; this.RemoveDevice = removeDevice; } public Option<Device> Device { get; } public string DeviceId { get; } public string IotHubConnectionString { get; } public RegistryManager RegistryManager { get; } public bool RemoveDevice { get; set; } public Task DeleteDevice() { if (this.RemoveDevice) { Console.WriteLine($"Trying to remove device from Registry. Device Id: {this.DeviceId}"); return this.RegistryManager.RemoveDeviceAsync(this.DeviceId); } return Task.CompletedTask; } } }