azext_iot/_factory.py (88 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- """ Factory functions for IoT Hub and Device Provisioning Service. """ from azext_iot.common.sas_token_auth import SasTokenAuthentication from azext_iot.common.auth import IoTOAuth from azext_iot.common.shared import SdkType, AuthenticationTypeDataplane from azext_iot.constants import ( USER_AGENT, IOTHUB_RESOURCE_ID, IOTDPS_RESOURCE_ID ) from msrestazure.azure_exceptions import CloudError __all__ = [ "SdkResolver", "CloudError", "iot_hub_service_factory", "iot_service_provisioning_factory", ] def iot_hub_service_factory(cli_ctx, *_): """ Factory for importing deps and getting service client resources. Args: cli_ctx (knack.cli.CLI): CLI context. *_ : all other args ignored. Returns: service_client (IoTHubClient): operational resource for working with IoT Hub Service. """ from azure.cli.core.commands.client_factory import get_mgmt_service_client from azure.cli.core.profiles import ResourceType return get_mgmt_service_client(cli_ctx, ResourceType.MGMT_IOTHUB) def iot_service_provisioning_factory(cli_ctx, *_): """ Factory for importing deps and getting service client resources. Args: cli_ctx (knack.cli.CLI): CLI context. *_ : all other args ignored. Returns: service_client (IotDpsClient): operational resource for working with IoT Hub Device Provisioning Service. """ from azure.cli.core.commands.client_factory import get_mgmt_service_client from azure.cli.core.profiles import ResourceType return get_mgmt_service_client(cli_ctx, ResourceType.MGMT_IOTDPS) class SdkResolver(object): def __init__(self, target, device_id=None, auth_override=None): self.target = target self.device_id = device_id self.auth_override = auth_override # This initialization will likely need to change to support more variation of SDK self.sas_uri = self.target["entity"] self.endpoint = "https://{}".format(self.sas_uri) # Base endpoints stay the same if self.device_id: self.sas_uri = "{}/devices/{}".format(self.sas_uri, self.device_id) def get_sdk(self, sdk_type): sdk_map = self._construct_sdk_map() sdk_client = sdk_map[sdk_type]() sdk_client.config.enable_http_logger = True sdk_client.config.add_user_agent(USER_AGENT) return sdk_client def _construct_sdk_map(self): return { SdkType.service_sdk: self._get_iothub_service_sdk, # Don't need to call here SdkType.device_sdk: self._get_iothub_device_sdk, SdkType.dps_sdk: self._get_dps_service_sdk, } def _get_iothub_device_sdk(self): from azext_iot.sdk.iothub.device import IotHubGatewayDeviceAPIs credentials = SasTokenAuthentication( uri=self.sas_uri, shared_access_policy_name=self.target["policy"], shared_access_key=self.target["primarykey"], ) return IotHubGatewayDeviceAPIs(credentials=credentials, base_url=self.endpoint) def _get_iothub_service_sdk(self): from azext_iot.sdk.iothub.service import IotHubGatewayServiceAPIs credentials = None if self.auth_override: credentials = self.auth_override elif self.target["policy"] == AuthenticationTypeDataplane.login.value: credentials = IoTOAuth( cli_ctx=self.target["cmd"].cli_ctx, resource_id=IOTHUB_RESOURCE_ID ) else: credentials = SasTokenAuthentication( uri=self.sas_uri, shared_access_policy_name=self.target["policy"], shared_access_key=self.target["primarykey"], ) return IotHubGatewayServiceAPIs(credentials=credentials, base_url=self.endpoint) def _get_dps_service_sdk(self): from azext_iot.sdk.dps.service import ProvisioningServiceClient credentials = None if self.auth_override: credentials = self.auth_override elif self.target["policy"] == AuthenticationTypeDataplane.login.value: credentials = IoTOAuth( cli_ctx=self.target["cmd"].cli_ctx, resource_id=IOTDPS_RESOURCE_ID ) else: credentials = SasTokenAuthentication( uri=self.sas_uri, shared_access_policy_name=self.target["policy"], shared_access_key=self.target["primarykey"], ) return ProvisioningServiceClient( credentials=credentials, base_url=self.endpoint )