azurepipelines/e2e_test/scenarios/testingtoolkit/_adu_test_toolkit.py (290 lines of code) (raw):

# ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # -------------------------------------------------------------------------- from azure.identity import ManagedIdentityCredential from azure.iot.deviceupdate import DeviceUpdateClient from azure.iot.hub import IoTHubRegistryManager from azure.core.rest import HttpRequest, HttpResponse from azure.iot.hub.protocol.models import Device from azure.iot.hub.protocol.models import Module from azure.iot.hub.protocol.models import Twin from urllib.parse import urljoin import base64 import datetime import getopt import json from msrest.exceptions import HttpOperationError import os import sys import uuid class DuAutomatedTestConfigurationManager(): def __init__(self, aduEndpoint="", aduInstanceId="", iotHubUrl="", iotHubConnectionString="", aadRegistrarClientId="", aadRegistrarTenantId="", pathToCertificate="", passwordForCertificate="") -> None: """ Convenience wrapper for the configuration details required for DU automated tests to run. """ self._aduEndpoint = "" self._aduInstanceId = "" self._iotHubUrl = "" self._iotHubConnectionString = "" self._aadRegistrarClientId = "" self._aadRegistrarTenantId = "" self._pathToCertificate = "" self._passwordForCertificate = "" self._configured = False @classmethod def FromCliArgs(cls): newCls = cls() newCls.ParseFromCLI() return newCls @classmethod def FromOSEnvironment(cls): newCls = cls() newCls.ParseFromOsEnviron() return newCls def ParseFromOsEnviron(self): self._aduEndpoint = os.environ['ADU_ENDPOINT'] self._aduInstanceId = os.environ['ADU_INSTANCE_ID'] self._iotHubUrl = os.environ['IOTHUB_URL'] self._msiTenantId = os.environ['AZURE_TENANT_ID'] self._msiClientId = os.environ['AZURE_CLIENT_ID'] self._msiResourceId = os.environ['AZURE_RESOURCE_ID'] self._configured = True def ParseFromCLI(self): # # Get opt will use the following for arguments. # (-a)--adu-endpoint # (-i)--adu-instance-id # (-u)--iothub-url # (-c)--iothub-connection-string # (-r)--msi-client-id # (-t)--msi-tenant-id # (-p)--msi-resource-id shortopts = "a:i:u:c:r:t:p:" longopts = ['adu-endpoint=', 'adu-instance-id=', 'iothub-url=', 'msi-client-id=', 'msi-tenant-id=', 'msi-resource-id=', ] optlist, args = getopt.getopt(sys.argv[1:], shortopts, longopts) # # Require that all parameters have been passed # if (len(optlist) < 7): return None for opt, val in optlist: if (opt == "-a" or opt == "--" + longopts[0]): self._aduEndpoint = val elif (opt == "-i" or opt == "--" + longopts[1]): self._aduInstanceId = val elif (opt == "-u" or opt == "--" + longopts[2]): self._iotHubUrl = val elif (opt == "-r" or opt == "--" + longopts[4]): self._msiClientId = val elif (opt == "-t" or opt == "--" + longopts[5]): self._msiTenantId = val elif (opt == "-p" or opt == "--" + longopts[6]): self._msiResourceId = val self._configured = True def CreateClientSecretCredential(self): if (not self._configured): print("Calling CreateClientSecretCredential without configuring first") return None return ManagedIdentityCredential() def CreateDeviceUpdateTestHelper(self, credential=None): if (not self._configured): print("Calling CreateDeviceUpdateTestHelper without configuring first") return None self.credential = credential if (self.credential == None): self.credential = self.CreateClientSecretCredential() return DeviceUpdateTestHelper(self._aduInstanceId, self._iotHubUrl, self.credential, self._aduEndpoint) class DeploymentSubGroupStatus(): def __init__(self, subGroupJson) -> None: self.groupId = subGroupJson["groupId"] self.deviceClassId = subGroupJson["deviceClassId"] self.deploymentState = subGroupJson["deploymentState"] self.totalDevices = subGroupJson["totalDevices"] self.devicesInProgressCount = subGroupJson["devicesInProgressCount"] self.devicesCompletedFailedCount = subGroupJson["devicesCompletedFailedCount"] self.devicesCompletedSucceededCount = subGroupJson["devicesCompletedSucceededCount"] self.devicesCanceledCount = subGroupJson["devicesCanceledCount"] if ("error" in subGroupJson): self.error = subGroupJson["error"] class DeploymentStatusResponse(): def __init__(self, deploymentStatusJson) -> None: """ Convenience wrapper object for the deployment status response. Converts the JSON returned by the service request into a Python object that makes it easier to access. You can see the types and potential values of each of the variables here: https://docs.microsoft.com/en-us/rest/api/deviceupdate/2021-06-01-preview/device-management/get-deployment-status """ self.deploymentState = "" self.ParseResponseJson(deploymentStatusJson) def ParseResponseJson(self, deploymentStatusJson): self.deploymentState = deploymentStatusJson["deploymentState"] self.groupId = deploymentStatusJson["groupId"] subgroupStatus = deploymentStatusJson["subgroupStatus"] self.subgroupStatuses = [] for status in subgroupStatus: self.subgroupStatuses.append(DeploymentSubGroupStatus(status)) class DiagnosticsDeviceStatus: def __init__(self,deviceStatusJson): self.deviceId = deviceStatusJson["deviceId"] self.moduleId = "" if ("moduleId" in deviceStatusJson): self.moduleId = deviceStatusJson["moduleId"] self.status = deviceStatusJson["status"] if ("resultCode" in deviceStatusJson): self.resultCode = deviceStatusJson["resultCode"] else: print ("No result code") if ("extendedResultCode" in deviceStatusJson): self.extendedResultCode = deviceStatusJson["extendedResultCode"] else: print("No extended result code") self.logLocation = deviceStatusJson["logLocation"] class DiagnosticLogCollectionStatusResponse(): def __init__(self): """ Convenience wrapper object for the diagnostic log collection status response. Converts the JSON returned by the service request into a Python object that makes it easier to access. You can see the types and potential values of each of the variables here: https://docs.microsoft.com/en-us/rest/api/deviceupdate/2021-06-01-preview/device-management/get-deployment-status """ super().__init__() self.operationId = "" self.status = "" self.createdDateTime = "" self.lastActionDateTime = "" self.deviceStatus = [] def ParseResponseJson(self, logCollectionStatusResponseJson): self.operationId = logCollectionStatusResponseJson["operationId"] self.createdDateTime = logCollectionStatusResponseJson["createdDateTime"] self.lastActionDateTime = logCollectionStatusResponseJson["lastActionDateTime"] self.status = logCollectionStatusResponseJson["status"] deviceStatuses = logCollectionStatusResponseJson["deviceStatus"] for status in deviceStatuses: deviceStatus = DiagnosticsDeviceStatus(status) self.deviceStatus.append(deviceStatus) return self class UpdateId(): def __init__(self, provider, name, version) -> None: """ Convenience wrapper for update-ids being used for deployments following the standard methods. """ self.provider = provider self.name = name self.version = version def __str__(self) -> str: return '{ "provider":"' + str(self.provider) + '", "name": "' + str(self.name) + '", "version": "' + str(self.version) + '"}' class DeviceUpdateTestHelper: def __init__(self, aduInstanceId, iothubUrl, adu_credential, endpoint="") -> None: """ Test Helper for the Device Update Test Automation work. The object wraps different parts of the required functions and works with the azure.iot.deviceupdate.DeviceUpdateClient and azure.iot.hub.IotHubRegistryManager objects to complete all of these operations :param str aduInstanceId: The InstanceId for the DeviceUpdate Account to be connected to :param str credential: The Azure token credential object retrieved from azure-identity :param str endpoint: The endpoint for the Device Update instance :ivar str _aduInstanceId: The InstanceId for the DeviceUpdate Account to be connected to :ivar str _aduEndpoint: The endpoint for the Device Update instance :ivar AzureCredential _credential: The Azure token credential object retrieved from azure-identity :ivar str _iotHubUrl: The url for the iothub to be used for testing :ivar DeviceUpdateClient _aduAcnt: The Device Update Client account object instantiated using the parameters used for DeviceUpdate operations :ivar IotHubRegistryManager _hubRegistryManager: The IotHubRegistryManager object instantiated using the parameters used for IotHub operations """ # Internal Values for managing the connection to DU and the IotHub self._aduInstanceId = aduInstanceId self._aduEndpoint = endpoint self._aduCredential = adu_credential self._iotHubUrl = iothubUrl self._aduAcnt = DeviceUpdateClient(endpoint=endpoint, instance_id=aduInstanceId, credential=adu_credential) self._hubRegistryManager = IoTHubRegistryManager(token_credential=adu_credential,host=iothubUrl) self._base_url = f'https://{self._aduEndpoint}/deviceUpdate/{self._aduInstanceId}/' self._iotHubApiVersion = "?api-version=2021-06-01-preview" self._aduApiVersion= "?api-version=2022-07-01-preview" def CreateSaSDevice(self, deviceId, isIotEdge=False): """ Use the IotHubRegistryManager to create the device using the passed deviceId and isIotEdge parameters using the IotHubRegistryManager :param str deviceId: The device-id to create the device with :param bool isIotEdge: Flag that determines whether the device should be created as an IotEdge device or an IotDevice :returns: A connection string on success; An empty string on failure """ primary_key = base64.b64encode(str(uuid.uuid4()).encode()).decode() secondary_key = base64.b64encode(str(uuid.uuid4()).encode()).decode() device_status = "enabled" self._hubRegistryManager.create_device_with_sas(deviceId, primary_key, secondary_key, device_status, isIotEdge) # Should always be of type Device device = self._hubRegistryManager.get_device(deviceId) if (type(device) != Device): print( "Return type for retrieving the device state is not Device. Requested Raw response?") # Can't guarantee that the device will be connected but the generation-id should work if (device.generation_id == ""): return "" # Once we can confirm that the device has been created we can make the connection string connectionString = "HostName=" + self._iotHubUrl + ";DeviceId=" + deviceId + ";SharedAccessKey=" + primary_key return connectionString def Createx509Device(self, deviceId, primaryX509Thumbprint , secondaryX509Thumbprint, isIotEdge=False): """ Use the IotHubRegistryManager to create the device using the passed deviceId and isIotEdge parameters using the IotHubRegistryManager :param str deviceId: The device-id to create the device with :param bool isIotEdge: Flag that determines whether the device should be created as an IotEdge device or an IotDevice :param primaryX509Thumbprint : The primary thumbprint for the device "param secondaryX509Thumbprint : The secondary thumbprint for the device :returns: A connection string on success; An empty string on failure """ primary_key = primaryX509Thumbprint secondary_key = secondaryX509Thumbprint device_status = "enabled" self._hubRegistryManager.create_device_with_x509(deviceId, primary_key, secondary_key, device_status, isIotEdge) # Should always be of type Device device = self._hubRegistryManager.get_device(deviceId) if (type(device) != Device): print( "Return type for retrieving the device state is not Device. Requested Raw response?") # Can't guarantee that the device will be connected but the generation-id should work if (device.generation_id == ""): return "" # Once we can confirm that the device has been created we can make the connection string connectionString = "HostName=" + self._iotHubUrl + ";DeviceId=" + deviceId + ";SharedAccessKey=" + primary_key return connectionString def DeleteDevice(self, deviceId): """ Deletes the device specified by deviceId :param str deviceId: the identifier of the device to be deleted :returns: True on successful deletion, false otherwise """ try: self._hubRegistryManager.delete_device(deviceId) except HttpOperationError: return False return True def CreateModuleForExistingDevice(self, deviceId, moduleId): """ Use the IotHubRegistryManager to create the module on the device using the passed deviceId and moduleId parameters using the IotHubRegistryManager :param str deviceId: The device on which to create the module :param str moduleId: The module name to be used when creating the module :returns: A connection string on success; An empty string on failure """ primary_key = base64.b64encode(str(uuid.uuid4()).encode()).decode() secondary_key = base64.b64encode(str(uuid.uuid4()).encode()).decode() device_status = "enabled" self._hubRegistryManager.create_module_with_sas(deviceId, moduleId, managed_by="", primary_key=primary_key, secondary_key=secondary_key) module = self._hubRegistryManager.get_module(deviceId, moduleId) if (module.generation_id == ""): return "" # Once we can confirm that the module has been created we can make the connection string connectionString = "HostName=" + self._iotHubUrl + ";DeviceId=" + deviceId + ";ModuleId=" + moduleId + ";SharedAccessKey=" + primary_key return connectionString def DeleteModuleOnDevice(self, deviceId, moduleId): """ Deletes the module specified by moduleId on the device :param str deviceId: the device for the module :param str moduleId: the module to be deleted :returns: True on successful deletion, False otherwise """ try: self._hubRegistryManager.delete_module(deviceId, moduleId) except HttpOperationError: return False return True def AddDeviceToGroup(self, deviceId, groupName): """ Patches the device twin "tags" value on the device twin to include the key ADUGroup and value of the parameter groupName :param str deviceId: the device-id of the device to add to the group :param str groupName: Name of the group to add the device to :returns: True on success; False on failure """ newTagForTwin = Twin(tags={"ADUGroup": groupName}) updatedTwin = self._hubRegistryManager.update_twin(deviceId, newTagForTwin) if (updatedTwin.tags["ADUGroup"] != groupName): return False return True def AddModuleToGroup(self, deviceId, moduleId, groupName): """ Patches the module twin "tags" value on the device twin to include the key ADUGroup and value of the parameter groupName :param str deviceId: the device-id of the device for the module :param str moduleId: the module-id for the module to which to add the group :param str groupName: Name of the group to add the module to :returns: True on success; False on failure """ newTagForTwin = Twin(tags={"ADUGroup": groupName}) twin = self.GetDeviceTwinForDevice(deviceId) updatedTwin = self._hubRegistryManager.update_module_twin(deviceId, moduleId, newTagForTwin) if (updatedTwin.tags["ADUGroup"] != groupName): return False return True def GetDeviceTwinForDevice(self, deviceId): """ Returns the twin of the device :param str deviceId: Identifier for the device to retreive the Twin for :returns: An object of type azure.iot.hub.protocols.models.Twin which encapsulates the twin """ return self._hubRegistryManager.get_twin(deviceId) def GetModuleTwinForModule(self, deviceId, moduleId): """ Returns the twin of the module :param str deviceId: Identifier for the device for the module :param str moduleId: Identifier for the module for which to retrieve the twin :returns: An object of type azure.iot.hub.protocols.models.Twin which encapsulates the twin """ return self._hubRegistryManager.get_module_twin(deviceId, moduleId) def GetConnectionStatusForDevice(self, deviceId): """ Returns the connection state of the device :param str deviceId: the deviceId for the device :returns: the string representation of the connection_state """ return self._hubRegistryManager.get_device(deviceId).connection_state def GetConnectionStatusForModule(self, deviceId, moduleId): """ Returns the connection state of the module :param str deviceId: the deviceId for the device :param str moduleId: the module for which to retrieve the connection_state :returns: the string representation of the connection_state """ return self._hubRegistryManager.get_module(deviceId, moduleId).connection_state def StartDeploymentForGroup(self, deploymentId, groupName, updateId): """ Starts the deployment for the specified groupname and updateId :param str deploymentId: the id for the deployment :param str groupName: the id for the group to which to deploy the update :param str updateId: the update-id for the deployment :param str rollbackPolicy: the string version of the rollback policy :param str failure: the string body of the failure definition :returns: the status code for the deployment request, 200 for success, all other values are failures """ requestString = "/deviceUpdate/" + self._aduInstanceId + "/management/groups/" + groupName + "/deployments/" + deploymentId + self._aduApiVersion if (type(updateId) != UpdateId): print("Unusable type for updateId, use the UpdateId class") jsonBodyString = '{"deploymentId": "' + deploymentId + '","groupId": "' + groupName + '","startDateTime": "' + str(datetime.datetime.now()) + '","update":{ "updateId": ' + str(updateId) + ' } }' deploymentStartRequest = HttpRequest("PUT", requestString, json=json.loads(jsonBodyString)) deploymentStartResponse = self._aduAcnt.send_request(deploymentStartRequest) return deploymentStartResponse.status_code def StopDeployment(self, deploymentId, groupName, deviceClassId): """ Stops the deployment for the specified groupname :param str deploymentId: the id for the deployment :param str groupName: the id for the group to which to cancel the deployment :returns: the status code for the deployment cancel request, 200 for success, all other values are failures """ # /deviceUpdate/{instanceId}/management/groups/{groupId}/deviceClassSubgroups/{deviceClassId}/deployments/{deploymentId} requestString = "/deviceUpdate/" + self._aduInstanceId + "/management/groups/" + groupName + "/deployments/" + deploymentId + self._aduApiVersion deploymentCancelRequest = HttpRequest("POST", requestString) deploymentCancelResponse = self._aduAcnt.send_request(deploymentCancelRequest) return deploymentCancelResponse.status_code def GetDeploymentStatusForGroup(self, deploymentId, groupName): """ Returns the deployment state for the given deploymentId and groupName :param str deploymentId: the id for the deployment :param str groupName: the id for the group to which the deployment was made :returns: An object of type DeploymentStatusResponse, this will be empty on failure """ deploymentStatusRequest = HttpRequest("GET", "/deviceUpdate/" + self._aduInstanceId + "/management/groups/" + groupName + "/deployments/" + deploymentId + "/status" + self._aduApiVersion) deploymentStatusResponse = self._aduAcnt.send_request(deploymentStatusRequest) if (deploymentStatusResponse.status_code != 200): return None deploymentStateResponseJson = DeploymentStatusResponse(deploymentStatusResponse.json()) return deploymentStateResponseJson def DeleteDeployment(self, deploymentId, groupId): """ Deletes the deployment specified by deploymentId for group specified by groupId :param str deploymentId: the identifier for the deployment to be deleted :param str groupId: the group-id for group on which the deployment was operating :returns: the status code of the response to delete the deployment """ requestString = '/deviceUpdate/' + self._aduInstanceId + '/management/groups/' + groupId + '/deployments/' + deploymentId + self._aduApiVersion deleteDeploymentRequest = HttpRequest("DELETE", requestString) deleteDeploymentResponse = self._aduAcnt.send_request(deleteDeploymentRequest) return deleteDeploymentResponse.status_code def RunDiagnosticsOnDeviceOrModule(self,deviceId,operationId,description,moduleId=""): """ Initiates a diagnostics log collection flow for the specified device or module using the operationId and description passed :param str deviceId: the device-id to target :param str operationId: the operation-id for the diagnostics workflow :param str description: the description for the diagnostics workflow :param str moduleId: Optional parameter only to be specified when targeting a module :returns: the status code of the request to start the diagnostics operation """ jsonBody = None if (len(moduleId) == 0): jsonBody = { "deviceList":[ { "deviceId": deviceId }, ], "description": description, "operationId":operationId } else: jsonBody = { "deviceList": [ { "deviceId": deviceId, "moduleId": moduleId } ], "description": description, "operationId":operationId } collectLog_url = f'management/deviceDiagnostics/logCollections/{operationId}{self._iotHubApiVersion}' requestString = urljoin(self._base_url, collectLog_url) diagnosticsRequest = HttpRequest("PUT", requestString, json=jsonBody) diagnosticsResponse = self._aduAcnt.send_request(diagnosticsRequest) return diagnosticsResponse.status_code def GetDiagnosticsLogCollectionStatus(self, operationId): """ Returns the log collection state for the given operationId :param str operationId: Log collection operation identifier :returns: An object of type DiagnosticLogCollectionStatusResponse, this will be empty on failure """ getLogCollectDetail_url = f'management/deviceDiagnostics/logCollections/{operationId}/detailedstatus{self._aduApiVersion}' requestString = urljoin(self._base_url, getLogCollectDetail_url) logCollectionStatusRequest = HttpRequest("GET", requestString) logCollectionStatusResponse = self._aduAcnt.send_request(logCollectionStatusRequest) if (logCollectionStatusResponse.status_code != 200): return DiagnosticLogCollectionStatusResponse() logCollectionStatusResponseJson = DiagnosticLogCollectionStatusResponse().ParseResponseJson(logCollectionStatusResponse.json()) return logCollectionStatusResponseJson def DeleteADUGroup(self, aduGroupId): """ Deletes the ADUGroup declared by aduGroupId :param str aduGroupId: the ADU Group to delete :returns: the status code of the response """ requestString = '/deviceUpdate/' + self._aduInstanceId + '/management/groups/' + aduGroupId + self._aduApiVersion deleteAduGroupRequest = HttpRequest("DELETE", requestString) deleteAduGroupResponse = self._aduAcnt.send_request(deleteAduGroupRequest) return deleteAduGroupResponse.status_code