# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from os.path import exists
from knack.log import get_logger
from enum import Enum, EnumMeta
from tqdm import tqdm
from azure.cli.core.azclierror import (
    ArgumentUsageError,
    CLIInternalError,
    ClientRequestError,
    FileOperationError,
    InvalidArgumentValueError,
    RequiredArgumentMissingError,
    ResourceNotFoundError,
    ValidationError,
)
from azext_iot.constants import (
    DEVICE_DEVICESCOPE_PREFIX,
    IOTHUB_RENEW_KEY_BATCH_SIZE,
    IOTHUB_THROTTLE_MAX_TRIES,
    IOTHUB_THROTTLE_SLEEP_SEC,
    THROTTLE_HTTP_STATUS_CODE,
    TRACING_PROPERTY,
    TRACING_ALLOWED_FOR_LOCATION,
    TRACING_ALLOWED_FOR_SKU,
)
from azext_iot.common.sas_token_auth import SasTokenAuthentication
from azext_iot.common.shared import (
    DeviceAuthType,
    SdkType,
    ConfigType,
    KeyType,
    RenewKeyType,
    IoTHubStateType,
    DeviceAuthApiType,
    ConnectionStringParser,
    EntityStatusType,
    JobType
)
from azext_iot.iothub.providers.discovery import IotHubDiscovery
from azext_iot.common.utility import (
    assemble_nargs_to_dict,
    handle_service_exception,
    read_file_content,
    init_monitoring,
    process_json_arg,
    generate_storage_account_sas_token,
)
from azext_iot._factory import SdkResolver, CloudError
from azext_iot.operations.generic import _execute_query
from typing import Optional
import pprint

logger = get_logger(__name__)
printer = pprint.PrettyPrinter(indent=2)


# Query

def iot_query(
    cmd,
    query_command,
    hub_name_or_hostname=None,
    top=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_query(target, query_command, top)


def _iot_query(target, query_command, top=None):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        query_args = [query_command]
        query_method = service_sdk.query.get_twins

        return _execute_query(query_args, query_method, top)
    except CloudError as e:
        handle_service_exception(e)


# Device


def iot_device_show(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_show(target, device_id)


def _iot_device_show(target, device_id):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        device = service_sdk.devices.get_identity(
            id=device_id, raw=True
        ).response.json()
        device["hub"] = target.get("entity")
        return device
    except CloudError as e:
        handle_service_exception(e)


def iot_device_create(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    edge_enabled=False,
    auth_method=DeviceAuthType.shared_private_key.value,
    primary_key=None,
    secondary_key=None,
    primary_thumbprint=None,
    secondary_thumbprint=None,
    status=EntityStatusType.enabled.value,
    status_reason=None,
    valid_days=None,
    output_dir=None,
    device_scope=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_create(
        target=target,
        device_id=device_id,
        edge_enabled=edge_enabled,
        auth_method=auth_method,
        primary_key=primary_key,
        secondary_key=secondary_key,
        primary_thumbprint=primary_thumbprint,
        secondary_thumbprint=secondary_thumbprint,
        status=status,
        status_reason=status_reason,
        valid_days=valid_days,
        output_dir=output_dir,
        device_scope=device_scope
    )


def _iot_device_create(
    target,
    device_id,
    edge_enabled=False,
    auth_method=DeviceAuthType.shared_private_key.value,
    primary_key=None,
    secondary_key=None,
    primary_thumbprint=None,
    secondary_thumbprint=None,
    status=EntityStatusType.enabled.value,
    status_reason=None,
    valid_days=None,
    output_dir=None,
    device_scope=None,
):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    if any([valid_days, output_dir]):
        valid_days = 365 if not valid_days else int(valid_days)
        if output_dir and not exists(output_dir):
            raise FileOperationError(
                "certificate output directory of '{}' does not exist.".format(
                    output_dir
                )
            )
        cert = _create_self_signed_cert(device_id, valid_days, output_dir)
        primary_thumbprint = cert["thumbprint"]

    try:
        device = _assemble_device(
            is_update=False,
            device_id=device_id,
            auth_method=auth_method,
            edge_enabled=edge_enabled,
            pk=primary_thumbprint if auth_method == DeviceAuthType.x509_thumbprint.value else primary_key,
            sk=secondary_thumbprint if auth_method == DeviceAuthType.x509_thumbprint.value else secondary_key,
            status=status,
            status_reason=status_reason,
            device_scope=device_scope,
        )
        output = service_sdk.devices.create_or_update_identity(
            id=device_id, device=device
        )
    except CloudError as e:
        handle_service_exception(e)
    except ValueError as ve:
        raise InvalidArgumentValueError(ve)

    return output


def _assemble_device(
    is_update,
    device_id,
    auth_method,
    edge_enabled,
    pk=None,
    sk=None,
    status=EntityStatusType.enabled.value,
    status_reason=None,
    device_scope=None,
):
    from azext_iot.sdk.iothub.service.models import DeviceCapabilities, Device

    auth = _assemble_auth(auth_method, pk, sk)
    cap = DeviceCapabilities(iot_edge=edge_enabled)
    if is_update:
        device = Device(
            device_id=device_id,
            authentication=auth,
            capabilities=cap,
            status=status,
            status_reason=status_reason,
            device_scope=device_scope,
        )
        return device
    if edge_enabled:
        parent_scopes = []
        if device_scope:
            parent_scopes = [device_scope]
        device = Device(
            device_id=device_id,
            authentication=auth,
            capabilities=cap,
            status=status,
            status_reason=status_reason,
            parent_scopes=parent_scopes,
        )
        return device
    else:
        device = Device(
            device_id=device_id,
            authentication=auth,
            capabilities=cap,
            status=status,
            status_reason=status_reason,
            device_scope=device_scope,
        )
        return device


def _assemble_auth(auth_method, pk, sk):
    from azext_iot.sdk.iothub.service.models import (
        AuthenticationMechanism,
        SymmetricKey,
        X509Thumbprint,
    )

    auth = None
    if auth_method in [
        DeviceAuthType.shared_private_key.name,
        DeviceAuthApiType.sas.value,
    ]:
        if any([pk, sk]) and not all([pk, sk]):
            raise ValueError("When configuring symmetric key auth both primary and secondary keys are required.")

        auth = AuthenticationMechanism(
            symmetric_key=SymmetricKey(primary_key=pk, secondary_key=sk),
            type=DeviceAuthApiType.sas.value,
        )
    elif auth_method in [
        DeviceAuthType.x509_thumbprint.name,
        DeviceAuthApiType.selfSigned.value,
    ]:
        if not pk:
            raise ValueError("When configuring selfSigned auth the primary thumbprint is required.")
        auth = AuthenticationMechanism(
            x509_thumbprint=X509Thumbprint(
                primary_thumbprint=pk, secondary_thumbprint=sk
            ),
            type=DeviceAuthApiType.selfSigned.value,
        )
    elif auth_method in [
        DeviceAuthType.x509_ca.name,
        DeviceAuthApiType.certificateAuthority.value,
    ]:
        auth = AuthenticationMechanism(
            type=DeviceAuthApiType.certificateAuthority.value
        )
    else:
        raise ValueError("Authorization method {} invalid.".format(auth_method))
    return auth


def _create_self_signed_cert(subject, valid_days, output_path=None):
    from azext_iot.common.certops import create_self_signed_certificate

    return create_self_signed_certificate(subject=subject, valid_days=valid_days, cert_output_dir=output_path)


def update_iot_device_custom(
    instance,
    edge_enabled=None,
    status=None,
    status_reason=None,
    auth_method=None,
    primary_thumbprint=None,
    secondary_thumbprint=None,
    primary_key=None,
    secondary_key=None,
):
    if edge_enabled is not None:
        instance["capabilities"]["iotEdge"] = edge_enabled
    if status is not None:
        instance["status"] = status
    if status_reason is not None:
        instance["statusReason"] = status_reason

    auth_type = instance["authentication"]["type"]
    if auth_method is not None:
        if auth_method == DeviceAuthType.shared_private_key.name:
            auth = DeviceAuthApiType.sas.value
            if (primary_key and not secondary_key) or (
                not primary_key and secondary_key
            ):
                raise RequiredArgumentMissingError("primary + secondary Key required with sas auth")
            instance["authentication"]["symmetricKey"]["primaryKey"] = primary_key
            instance["authentication"]["symmetricKey"]["secondaryKey"] = secondary_key
        elif auth_method == DeviceAuthType.x509_thumbprint.name:
            auth = DeviceAuthApiType.selfSigned.value
            if not any([primary_thumbprint, secondary_thumbprint]):
                raise RequiredArgumentMissingError(
                    "primary or secondary Thumbprint required with selfSigned auth"
                )
            if primary_thumbprint:
                instance["authentication"]["x509Thumbprint"][
                    "primaryThumbprint"
                ] = primary_thumbprint
            if secondary_thumbprint:
                instance["authentication"]["x509Thumbprint"][
                    "secondaryThumbprint"
                ] = secondary_thumbprint
        elif auth_method == DeviceAuthType.x509_ca.name:
            auth = DeviceAuthApiType.certificateAuthority.value
        else:
            raise ValueError("Authorization method {} invalid.".format(auth_method))
        instance["authentication"]["type"] = auth

    # if no new auth_method is provided, validate secondary auth arguments and update accordingly
    elif auth_type == DeviceAuthApiType.sas.value:
        if any([primary_thumbprint, secondary_thumbprint]):
            raise ValueError(
                "Device authorization method {} does not support primary or secondary thumbprints.".format(
                    DeviceAuthType.shared_private_key.name
                )
            )
        if primary_key:
            instance["authentication"]["symmetricKey"]["primaryKey"] = primary_key
        if secondary_key:
            instance["authentication"]["symmetricKey"]["secondaryKey"] = secondary_key

    elif auth_type == DeviceAuthApiType.selfSigned.value:
        if any([primary_key, secondary_key]):
            raise ValueError(
                "Device authorization method {} does not support primary or secondary keys.".format(
                    DeviceAuthType.x509_thumbprint.name
                )
            )
        if primary_thumbprint:
            instance["authentication"]["x509Thumbprint"][
                "primaryThumbprint"
            ] = primary_thumbprint
        if secondary_thumbprint:
            instance["authentication"]["x509Thumbprint"][
                "secondaryThumbprint"
            ] = secondary_thumbprint
    return instance


def iot_device_update(
    cmd,
    device_id,
    parameters,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )

    auth, pk, sk = _parse_auth(parameters)
    updated_device = _assemble_device(
        True,
        parameters["deviceId"],
        auth,
        parameters["capabilities"]["iotEdge"],
        pk,
        sk,
        parameters["status"].lower(),
        parameters.get("statusReason"),
        parameters.get("deviceScope"),
    )
    updated_device.etag = etag if etag else "*"
    return _iot_device_update(target, device_id, updated_device)


def _iot_device_update(target, device_id, device):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        headers = {}
        headers["If-Match"] = '"{}"'.format(device.etag)
        return service_sdk.devices.create_or_update_identity(
            id=device_id, device=device, custom_headers=headers
        )
    except CloudError as e:
        handle_service_exception(e)


def iot_device_delete(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_delete(target, device_id, etag)


def _iot_device_delete(target, device_id, etag=None):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        service_sdk.devices.delete_identity(id=device_id, custom_headers=headers)
        return
    except CloudError as e:
        handle_service_exception(e)


def _update_device_key(target, device, auth_method, pk, sk, etag=None):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        auth = _assemble_auth(auth_method, pk, sk)
        device["authentication"] = auth
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        return service_sdk.devices.create_or_update_identity(
            id=device["deviceId"],
            device=device,
            custom_headers=headers,
        )
    except CloudError as e:
        handle_service_exception(e)


def iot_device_key_regenerate(
    cmd,
    hub_name_or_hostname,
    device_ids,
    renew_key_type,
    include_modules=False,
    no_progress=False,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )

    if renew_key_type == RenewKeyType.swap.value:
        if len(device_ids) > 1 or device_ids[0] == "*":
            raise InvalidArgumentValueError(
                "Currently, bulk key swap is not supported."
            )
        device = _iot_device_show(target, device_ids[0])
        if device["authentication"]["type"] != DeviceAuthApiType.sas.value:
            raise ClientRequestError("Device authentication should be of type sas")

        pk = device["authentication"]["symmetricKey"]["primaryKey"]
        sk = device["authentication"]["symmetricKey"]["secondaryKey"]

        temp = pk
        pk = sk
        sk = temp
        return _update_device_key(
            target, device, device["authentication"]["type"], pk, sk, etag
        )

    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)
    if renew_key_type in [RenewKeyType.primary.value, RenewKeyType.secondary.value]:
        renew_key_type += "Key"

    modules = []
    devices = []
    if device_ids[0] == "*":
        device_twins = _iot_device_twin_list(target=target, top=None)
        device_twins.extend(_iot_device_twin_list(target=target, edge_enabled=True, top=None))
        for device in device_twins:
            if device["authenticationType"] == DeviceAuthApiType.sas.value:
                devices.append({"id": device["deviceId"]})
            # non sas devices can have sas modules...
            if include_modules:
                modules.extend(
                    _iot_key_regenerate_process_modules(
                        target=target, device_id=device["deviceId"], module_ids="*"
                    )
                )
    else:
        devices = [{"id": device_id} for device_id in device_ids]

    if modules:
        logger.info(f"Found {len(modules)} modules.")

    result = _iot_key_regenerate_batch(
        service_sdk=service_sdk,
        renew_key_type=renew_key_type,
        items=devices + modules,
        no_progress=no_progress
    )

    # avoid breaking changes by having one device return the device identity
    if all({len(device_ids) == 1, device_ids[0] != "*", not include_modules}):
        return _iot_device_show(target, device_ids[0])

    return result


def iot_device_get_parent(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    child_device = _iot_device_show(target, device_id)
    _validate_child_device(child_device)
    parent_scope = child_device["parentScopes"][0]
    parent_device_id = parent_scope[
        len(DEVICE_DEVICESCOPE_PREFIX) : parent_scope.rindex("-")
    ]
    return _iot_device_show(target, parent_device_id)


def iot_device_set_parent(
    cmd,
    device_id,
    parent_id,
    force=False,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    _iot_device_set_parent(target, parent_id, device_id, force)


def _iot_device_set_parent(target, parent_id, device_id, force=False):
    parent_device = _iot_device_show(target, parent_id)
    _validate_edge_device(parent_device)
    child_device = _iot_device_show(target, device_id)
    _validate_parent_child_relation(child_device, force)

    _update_device_parent(
        target,
        child_device,
        child_device["capabilities"]["iotEdge"],
        parent_device["deviceScope"],
    )


def iot_device_children_add(
    cmd,
    device_id,
    child_list,
    force=False,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_children_add(target, device_id, child_list, force)


def _iot_device_children_add(
    target,
    device_id,
    child_list,
    force=False
):
    devices = []
    edge_device = _iot_device_show(target, device_id)
    _validate_edge_device(edge_device)
    converted_child_list = child_list
    for child_device_id in converted_child_list:
        child_device = _iot_device_show(target, child_device_id.strip())
        _validate_parent_child_relation(child_device, force)
        devices.append(child_device)

    for device in devices:
        _update_device_parent(
            target,
            device,
            device["capabilities"]["iotEdge"],
            edge_device["deviceScope"],
        )


def iot_device_children_remove(
    cmd,
    device_id,
    child_list=None,
    remove_all=False,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    devices = []
    if remove_all:
        result = _iot_device_children_list(target, device_id)
        if not result:
            raise ClientRequestError(
                'No registered child devices found for "{}" edge device.'.format(
                    device_id
                )
            )
        for child_device_id in [str(x["deviceId"]) for x in result]:
            child_device = _iot_device_show(target, child_device_id.strip())
            devices.append(child_device)
    elif child_list:
        edge_device = _iot_device_show(target, device_id)
        _validate_edge_device(edge_device)
        converted_child_list = child_list
        for child_device_id in converted_child_list:
            child_device = _iot_device_show(target, child_device_id.strip())
            _validate_child_device(child_device)
            if child_device["parentScopes"] == [edge_device["deviceScope"]]:
                devices.append(child_device)
            else:
                raise ClientRequestError(
                    'The entered child device "{}" isn\'t assigned as a child of edge device "{}"'.format(
                        child_device_id.strip(), device_id
                    )
                )
    else:
        raise RequiredArgumentMissingError(
            "Please specify child list or use --remove-all to remove all children."
        )

    for device in devices:
        _update_device_parent(target, device, device["capabilities"]["iotEdge"])


def iot_device_children_list(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    result = _iot_device_children_list(target, device_id)

    return [device["deviceId"] for device in result]


def _iot_device_children_list(target, device_id):
    device = _iot_device_show(target, device_id)
    _validate_edge_device(device)
    query = (
        "select deviceId from devices where array_contains(parentScopes, '{}')".format(
            device["deviceScope"]
        )
    )

    # TODO: Inefficient
    return _iot_query(target, query)


def _update_device_parent(target, device, is_edge, device_scope=None):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        if is_edge:
            parent_scopes = []
            if device_scope:
                parent_scopes = [device_scope]
            device["parentScopes"] = parent_scopes
        else:
            if not device_scope:
                device_scope = ""
            device["deviceScope"] = device_scope
        etag = device.get("etag", None)
        if etag:
            headers = {}
            headers["If-Match"] = '"{}"'.format(etag)
            service_sdk.devices.create_or_update_identity(
                id=device["deviceId"],
                device=device,
                custom_headers=headers,
            )
            return
        raise LookupError("device etag not found.")
    except CloudError as e:
        handle_service_exception(e)
    except LookupError as err:
        raise CLIInternalError(err)


def _validate_edge_device(device):
    if not device["capabilities"]["iotEdge"]:
        raise ClientRequestError(
            'The device "{}" should be an edge device.'.format(device["deviceId"])
        )


def _validate_child_device(device):
    if "parentScopes" not in device:
        raise ClientRequestError(
            'Device "{}" doesn\'t support parent device functionality.'.format(
                device["deviceId"]
            )
        )
    if not device["parentScopes"]:
        raise ClientRequestError(
            'Device "{}" doesn\'t have any parent device.'.format(device["deviceId"])
        )


def _validate_parent_child_relation(child_device, force):
    if "parentScopes" not in child_device or child_device["parentScopes"] == []:
        return
    else:
        if not force:
            raise ClientRequestError(
                "The entered device \"{}\" already has a parent device, please use '--force'"
                " to overwrite".format(child_device["deviceId"])
            )
        return


# Module


def iot_device_module_create(
    cmd,
    device_id,
    module_id,
    hub_name_or_hostname=None,
    auth_method=DeviceAuthType.shared_private_key.value,
    primary_key=None,
    secondary_key=None,
    primary_thumbprint=None,
    secondary_thumbprint=None,
    valid_days=None,
    output_dir=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):

    if any([valid_days, output_dir]):
        valid_days = 365 if not valid_days else int(valid_days)
        if output_dir and not exists(output_dir):
            raise FileOperationError(
                "certificate output directory of '{}' does not exist.".format(
                    output_dir
                )
            )
        cert = _create_self_signed_cert(module_id, valid_days, output_dir)
        primary_thumbprint = cert["thumbprint"]

    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_module_create(
        target=target,
        device_id=device_id,
        module_id=module_id,
        auth_method=auth_method,
        primary_key=primary_key,
        secondary_key=secondary_key,
        primary_thumbprint=primary_thumbprint,
        secondary_thumbprint=secondary_thumbprint
    )


def _iot_device_module_create(
    target,
    device_id,
    module_id,
    auth_method=DeviceAuthType.shared_private_key.value,
    primary_key=None,
    secondary_key=None,
    primary_thumbprint=None,
    secondary_thumbprint=None
):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        module = _assemble_module(
            device_id=device_id,
            module_id=module_id,
            auth_method=auth_method,
            pk=primary_thumbprint if auth_method == DeviceAuthType.x509_thumbprint.value else primary_key,
            sk=secondary_thumbprint if auth_method == DeviceAuthType.x509_thumbprint.value else secondary_key,
        )
        return service_sdk.modules.create_or_update_identity(
            id=device_id, mid=module_id, module=module
        )
    except CloudError as e:
        handle_service_exception(e)
    except ValueError as ve:
        raise InvalidArgumentValueError(ve)


def _assemble_module(device_id, module_id, auth_method, pk=None, sk=None):
    from azext_iot.sdk.iothub.service.models import Module

    auth = _assemble_auth(auth_method, pk, sk)
    module = Module(module_id=module_id, device_id=device_id, authentication=auth)
    return module


def iot_device_module_update(
    cmd,
    device_id,
    module_id,
    parameters,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        updated_module = _handle_module_update_params(parameters)
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        return service_sdk.modules.create_or_update_identity(
            id=device_id,
            mid=module_id,
            module=updated_module,
            custom_headers=headers,
        )
    except CloudError as e:
        handle_service_exception(e)


def _handle_module_update_params(parameters):
    auth, pk, sk = _parse_auth(parameters)
    return _assemble_module(
        device_id=parameters["deviceId"],
        module_id=parameters["moduleId"],
        auth_method=auth,
        pk=pk,
        sk=sk,
    )


def _parse_auth(parameters):
    valid_auth = [
        DeviceAuthApiType.sas.value,
        DeviceAuthApiType.selfSigned.value,
        DeviceAuthApiType.certificateAuthority.value,
    ]
    auth = parameters["authentication"].get("type")
    if auth not in valid_auth:
        raise InvalidArgumentValueError("authentication.type must be one of {}".format(valid_auth))
    pk = sk = None
    if auth == DeviceAuthApiType.sas.value:
        pk = parameters["authentication"]["symmetricKey"]["primaryKey"]
        sk = parameters["authentication"]["symmetricKey"]["secondaryKey"]
    elif auth == DeviceAuthApiType.selfSigned.value:
        pk = parameters["authentication"]["x509Thumbprint"]["primaryThumbprint"]
        sk = parameters["authentication"]["x509Thumbprint"]["secondaryThumbprint"]
        if not any([pk, sk]):
            raise RequiredArgumentMissingError(
                "primary + secondary Thumbprint required with selfSigned auth"
            )
    return auth, pk, sk


def iot_device_module_key_regenerate(
    cmd,
    hub_name_or_hostname,
    device_id,
    module_ids,
    renew_key_type,
    no_progress=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    if renew_key_type == RenewKeyType.swap.value:
        if len(module_ids) > 1 or module_ids[0] == "*":
            raise InvalidArgumentValueError(
                "Currently, bulk key swap is not supported."
            )
        module = _iot_device_module_show(target=target, device_id=device_id, module_id=module_ids[0])
        if module["authentication"]["type"] != DeviceAuthApiType.sas.value:
            raise ClientRequestError("Module authentication should be of type sas")

        pk = module["authentication"]["symmetricKey"]["primaryKey"]
        sk = module["authentication"]["symmetricKey"]["secondaryKey"]

        temp = pk
        module["authentication"]["symmetricKey"]["primaryKey"] = sk
        module["authentication"]["symmetricKey"]["secondaryKey"] = temp
        try:
            return service_sdk.modules.create_or_update_identity(
                id=device_id,
                mid=module_ids[0],
                module=module,
                custom_headers={
                    "If-Match": '"{}"'.format(etag if etag else "*")
                },
            )
        except CloudError as e:
            handle_service_exception(e)

    if renew_key_type in [RenewKeyType.primary.value, RenewKeyType.secondary.value]:
        renew_key_type += "Key"

    modules = _iot_key_regenerate_process_modules(
        target, device_id, module_ids
    )
    result = _iot_key_regenerate_batch(
        service_sdk=service_sdk,
        renew_key_type=renew_key_type,
        items=modules,
        device_id=device_id,
        no_progress=no_progress
    )

    if len(module_ids) == 1 and module_ids[0] != "*":
        return _iot_device_module_show(target=target, device_id=device_id, module_id=module_ids[0])
    return result


def _iot_key_regenerate_process_modules(
    target,
    device_id,
    module_ids,
):
    if module_ids[0] == "*":
        modules = _iot_device_module_list(target=target, device_id=device_id, top=None)
        module_ids = []
        for module in modules:
            # not going to question why the call the Module object - just making things
            # easier for unit tests
            if not isinstance(module, dict):
                module = module.serialize()
            if module["authentication"]["type"] == DeviceAuthApiType.sas.value:
                module_ids.append(module["moduleId"])

    return [{"id": device_id, "moduleId": module_id} for module_id in module_ids]


def _iot_key_regenerate_batch(
    service_sdk,
    renew_key_type,
    items,
    device_id=None,
    no_progress=False,
):
    from time import sleep
    overall_result = {
        "policyKey": renew_key_type,
        "errors": [],
        "rotatedKeys": []
    }
    starting_msg = f"modules for device {device_id}" if device_id else "devices"
    logger.info(f"Found {len(items)} {starting_msg}.")
    if not items:
        return {}
    batches = []
    while items:
        if len(items) > IOTHUB_RENEW_KEY_BATCH_SIZE:
            batches.append(items[:IOTHUB_RENEW_KEY_BATCH_SIZE])
            items = items[IOTHUB_RENEW_KEY_BATCH_SIZE:]
        else:
            batches.append(items[:])
            items = []
    for batch in tqdm(batches, desc="Bulk key regeneration is in progress", ascii=' #', disable=no_progress):
        # call
        result = None
        tries = 0
        while tries < IOTHUB_THROTTLE_MAX_TRIES:
            try:
                result = service_sdk.service.bulk_regenerate_device_key_method(
                    policy_key=renew_key_type,
                    devices=batch
                )
                break
            except CloudError as e:
                tries += 1
                if tries == IOTHUB_THROTTLE_MAX_TRIES or e.status_code != THROTTLE_HTTP_STATUS_CODE:
                    if overall_result["rotatedKeys"]:
                        logger.warning(
                            f"Managed to renew the following keys:\n{overall_result['rotatedKeys']}"
                        )
                    handle_service_exception(e)
                sleep(IOTHUB_THROTTLE_SLEEP_SEC)
        # combine result
        if result.errors:
            overall_result["errors"].extend(result.errors)
        if result.rotated_keys:
            overall_result["rotatedKeys"].extend(result.rotated_keys)
    return overall_result


def iot_device_module_list(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    top=1000,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_module_list(target, device_id, top)


def _iot_device_module_list(target, device_id, top=1000):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        return service_sdk.modules.get_modules_on_device(device_id)[:top]
    except CloudError as e:
        handle_service_exception(e)


def iot_device_module_show(
    cmd,
    device_id,
    module_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_module_show(target, device_id, module_id)


def _iot_device_module_show(target, device_id, module_id):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        module = service_sdk.modules.get_identity(
            id=device_id, mid=module_id, raw=True
        ).response.json()
        module["hub"] = target.get("entity")
        return module
    except CloudError as e:
        handle_service_exception(e)


def iot_device_module_delete(
    cmd,
    device_id,
    module_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        service_sdk.modules.delete_identity(
            id=device_id, mid=module_id, custom_headers=headers
        )
        return
    except CloudError as e:
        handle_service_exception(e)


def iot_device_module_twin_show(
    cmd,
    device_id,
    module_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_module_twin_show(
        target=target, device_id=device_id, module_id=module_id
    )


def _iot_device_module_twin_show(target, device_id, module_id):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        return service_sdk.modules.get_twin(
            id=device_id, mid=module_id, raw=True
        ).response.json()
    except CloudError as e:
        handle_service_exception(e)


def iot_device_module_twin_update(
    cmd,
    device_id,
    module_id,
    parameters,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_module_twin_update(target, device_id, module_id, parameters, etag)


def _iot_device_module_twin_update(target, device_id, module_id, parameters, etag=None):
    from azext_iot.common.utility import verify_transform
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        verify = {}
        if parameters.get("properties"):
            if parameters["properties"].get("desired"):
                verify = {"properties.desired": dict}
        if parameters.get("tags"):
            verify["tags"] = dict
        verify_transform(parameters, verify)
        return service_sdk.modules.update_twin(
            id=device_id,
            mid=module_id,
            device_twin_info=parameters,
            custom_headers=headers,
        )
    except CloudError as e:
        handle_service_exception(e)
    except (AttributeError, TypeError) as err:
        raise CLIInternalError(err)


def iot_device_module_twin_replace(
    cmd,
    device_id,
    module_id,
    target_json,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_module_twin_replace(target, device_id, module_id, target_json, etag)


def _iot_device_module_twin_replace(target, device_id, module_id, target_json, etag=None):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        target_json = process_json_arg(target_json, argument_name="json")
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        return service_sdk.modules.replace_twin(
            id=device_id,
            mid=module_id,
            device_twin_info=target_json,
            custom_headers=headers,
        )
    except CloudError as e:
        handle_service_exception(e)


def iot_edge_set_modules(
    cmd,
    device_id,
    content,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):

    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_edge_set_modules(target, device_id, content)


def _iot_edge_set_modules(target, device_id, content):
    from azext_iot.sdk.iothub.service.models import ConfigurationContent
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        content = process_json_arg(content, argument_name="content")
        processed_content = _process_config_content(
            content, config_type=ConfigType.edge
        )

        content = ConfigurationContent(**processed_content)
        service_sdk.configuration.apply_on_edge_device(id=device_id, content=content)
        return _iot_device_module_list(target, device_id)
    except CloudError as e:
        handle_service_exception(e)


def iot_edge_export_modules(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    module_twin_list = []

    try:
        # Get all modules in the device
        module_list = iot_device_module_list(cmd, device_id, hub_name_or_hostname=hub_name_or_hostname, login=login)
        for module in module_list:
            # Get module twins using module ids
            module_twin = _iot_device_module_twin_show(
                target=target, device_id=device_id, module_id=module.module_id)
            module_twin_list.append(module_twin)

        # Turn module twins list into module twin configuration
        return _build_edge_modules_configuration(module_twin_list)
    except CloudError as e:
        handle_service_exception(e)


def _build_edge_modules_configuration(module_twin_list):
    modulesContent = {}
    for module_twin in module_twin_list:
        moduleId = module_twin["moduleId"]
        desiredProperties = module_twin["properties"]["desired"]
        # Add desired properties from module twin except $metadata and $version
        if desiredProperties:
            desiredProperties.pop("$metadata")
            desiredProperties.pop("$version")
            modulesContent[moduleId] = {"properties.desired": desiredProperties}

    return {"content": {"modulesContent": modulesContent}}


def iot_edge_deployment_create(
    cmd,
    config_id,
    content,
    custom_labels=None,
    custom_metric_queries=None,
    hub_name_or_hostname=None,
    target_condition="",
    priority=0,
    labels=None,
    metrics=None,
    layered=False,
    no_validation=False,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    # Short-term fix for --no-validation
    config_type = ConfigType.layered if layered or no_validation else ConfigType.edge
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_hub_configuration_create(
        target=target,
        config_id=config_id,
        content=content,
        custom_labels=custom_labels,
        custom_metric_queries=custom_metric_queries,
        target_condition=target_condition,
        priority=priority,
        labels=labels,
        metrics=metrics,
        config_type=config_type
    )


def iot_hub_configuration_create(
    cmd,
    config_id,
    content,
    custom_labels=None,
    custom_metric_queries=None,
    hub_name_or_hostname=None,
    target_condition="",
    priority=0,
    labels=None,
    metrics=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_hub_configuration_create(
        target=target,
        config_id=config_id,
        content=content,
        custom_labels=custom_labels,
        custom_metric_queries=custom_metric_queries,
        target_condition=target_condition,
        priority=priority,
        labels=labels,
        metrics=metrics,
        config_type=ConfigType.adm
    )


def _iot_hub_configuration_create(
    target,
    config_id,
    content,
    config_type=ConfigType.adm,
    custom_labels=None,
    custom_metric_queries=None,
    target_condition="",
    priority=0,
    labels=None,
    metrics=None
):
    from azext_iot.sdk.iothub.service.models import (
        Configuration,
        ConfigurationContent,
        ConfigurationMetrics,
    )
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    logger.debug("ensuring lowercase configuration Id...")
    config_id = config_id.lower()
    metrics_key = "queries"

    content = process_json_arg(content, argument_name="content")
    processed_content = _process_config_content(content, config_type)
    if "module_content" in processed_content:
        required_target_prefix = "from devices.modules where"
        lower_target_condition = target_condition.lower()
        if not lower_target_condition.startswith(required_target_prefix):
            raise InvalidArgumentValueError(
                "The target condition for a module configuration must start with '{}'".format(
                    required_target_prefix
                )
            )

    if metrics:
        metrics = process_json_arg(metrics, argument_name="metrics")

        if "metrics" in metrics:
            metrics = metrics["metrics"]
        if metrics_key not in metrics:
            raise InvalidArgumentValueError(
                "metrics json must include the '{}' property".format(metrics_key)
            )
        metrics = metrics[metrics_key]
    elif custom_metric_queries:
        metrics = assemble_nargs_to_dict(custom_metric_queries)

    if labels:
        labels = process_json_arg(labels, argument_name="labels")
    elif custom_labels:
        labels = assemble_nargs_to_dict(custom_labels)

    config_content = ConfigurationContent(**processed_content)

    config_metrics = ConfigurationMetrics(queries=metrics)
    config = Configuration(
        id=config_id,
        schema_version="2.0",
        labels=labels,
        content=config_content,
        metrics=config_metrics,
        target_condition=target_condition,
        etag="*",
        priority=priority,
    )

    try:
        return service_sdk.configuration.create_or_update(
            id=config_id, configuration=config
        )
    except CloudError as e:
        handle_service_exception(e)


def _process_config_content(content, config_type):
    from knack.util import to_snake_case

    # Supports scenario where configuration payload is contained in 'content' key
    if "content" in content:
        content = content["content"]

    # Create new config dict to remove superflous properties
    processed_content = {}
    if config_type == ConfigType.adm:
        valid_adm_keys = ["deviceContent", "moduleContent"]
        if not all(key in content for key in valid_adm_keys):
            for key in valid_adm_keys:
                if key in content:
                    processed_content[to_snake_case(key)] = content[key]
                    return processed_content

        raise InvalidArgumentValueError(
            "Automatic device configuration payloads require property: {}".format(
                " or ".join(map(str, valid_adm_keys))
            )
        )

    if config_type == ConfigType.edge or config_type == ConfigType.layered:
        valid_edge_key = "modulesContent"
        legacy_edge_key = "moduleContent"

        if valid_edge_key in content:
            processed_content[valid_edge_key] = content[valid_edge_key]
        elif legacy_edge_key in content:
            logger.warning(
                "'%s' is deprecated for edge deployments. Use '%s' instead - request is still processing...",
                legacy_edge_key,
                valid_edge_key,
            )
            processed_content[valid_edge_key] = content[legacy_edge_key]

        if processed_content:
            # Schema based validation currently for IoT edge deployment only
            if config_type == ConfigType.edge:
                _validate_payload_schema(processed_content)

            processed_content[to_snake_case(valid_edge_key)] = processed_content[
                valid_edge_key
            ]
            del processed_content[valid_edge_key]

            return processed_content

        raise InvalidArgumentValueError(
            "Edge deployment payloads require property: {}".format(valid_edge_key)
        )


def _validate_payload_schema(content):
    import json
    from os.path import join
    from azext_iot.models.validators import JsonSchemaType, JsonSchemaValidator
    from azext_iot.constants import EDGE_DEPLOYMENT_ROOT_SCHEMAS_PATH as root_schema_path
    from azext_iot.common.utility import shell_safe_json_parse

    EDGE_AGENT_SCHEMA_PATH = "azure-iot-edgeagent-deployment-{}.json"
    EDGE_HUB_SCHEMA_PATH = "azure-iot-edgehub-deployment-{}.json"
    EDGE_SCHEMA_PATH_DICT = {
        "$edgeAgent": EDGE_AGENT_SCHEMA_PATH,
        "$edgeHub": EDGE_HUB_SCHEMA_PATH,
    }

    modules_content = content["modulesContent"]
    system_modules_for_validation = ["$edgeAgent", "$edgeHub"]

    for sys_module in system_modules_for_validation:
        if sys_module in modules_content:
            if (
                "properties.desired" in modules_content[sys_module]
                and "schemaVersion"
                in modules_content[sys_module]["properties.desired"]
            ):
                target_schema_ver = modules_content[sys_module][
                    "properties.desired"
                ]["schemaVersion"]
                target_schema_def_path = join(root_schema_path, f"{EDGE_SCHEMA_PATH_DICT[sys_module].format(target_schema_ver)}")

                logger.info("Attempting to fetch schema content from %s...", target_schema_def_path)
                if not exists(target_schema_def_path):
                    logger.info("Invalid schema path %s, skipping validation...", target_schema_def_path)
                    continue

                try:
                    target_schema_def = str(read_file_content(target_schema_def_path))
                    target_schema_def = shell_safe_json_parse(target_schema_def)
                except Exception:
                    logger.info(
                        "Unable to fetch schema content from %s skipping validation...",
                        target_schema_def_path,
                    )
                    continue

                logger.info(f"Validating {sys_module} of deployment payload against schema...")
                to_validate_content = {
                    sys_module: modules_content[sys_module]
                }
                draft_version = JsonSchemaType.draft4
                if "$schema" in target_schema_def and "/draft-07/" in target_schema_def["$schema"]:
                    draft_version = JsonSchemaType.draft7

                v = JsonSchemaValidator(target_schema_def, draft_version)
                errors = v.validate(to_validate_content)
                if errors:
                    # Pretty printing schema validation errors
                    raise ValidationError(
                        json.dumps(
                            {"validationErrors": errors},
                            separators=(",", ":"),
                            indent=2,
                        )
                    )


def iot_hub_configuration_update(
    cmd,
    config_id,
    parameters,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    from azext_iot.sdk.iothub.service.models import Configuration
    from azext_iot.common.utility import verify_transform

    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        verify = {"metrics": dict, "metrics.queries": dict, "content": dict}
        if parameters.get("labels"):
            verify["labels"] = dict
        verify_transform(parameters, verify)
        config = Configuration(
            id=parameters["id"],
            schema_version=parameters["schemaVersion"],
            labels=parameters["labels"],
            content=parameters["content"],
            metrics=parameters.get("metrics", None),
            target_condition=parameters["targetCondition"],
            priority=parameters["priority"],
        )
        return service_sdk.configuration.create_or_update(
            id=config_id, configuration=config, custom_headers=headers
        )
    except CloudError as e:
        handle_service_exception(e)
    except (AttributeError, TypeError) as err:
        raise CLIInternalError(err)


def iot_hub_configuration_show(
    cmd,
    config_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_hub_configuration_show(target=target, config_id=config_id)


def _iot_hub_configuration_show(target, config_id):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        return service_sdk.configuration.get(id=config_id, raw=True).response.json()
    except CloudError as e:
        handle_service_exception(e)


def iot_hub_configuration_list(
    cmd,
    hub_name_or_hostname=None,
    top=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    result = _iot_hub_configuration_list(target)

    filtered = [
        c
        for c in result
        if (
            c["content"].get("deviceContent") is not None
            or c["content"].get("moduleContent") is not None
        )
    ]
    return filtered[:top]  # list[:None] == list[:len(list)]


def iot_edge_deployment_list(
    cmd,
    hub_name_or_hostname=None,
    top=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    result = _iot_hub_configuration_list(target)

    filtered = [c for c in result if c["content"].get("modulesContent") is not None]
    return filtered[:top]  # list[:None] == list[:len(list)]


def _iot_hub_configuration_list(target):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        result = service_sdk.configuration.get_configurations(raw=True).response.json()
        if not result:
            hub_name_or_hostname = target["name"]
            logger.info('No configurations found on hub "%s".', hub_name_or_hostname)
        return result
    except CloudError as e:
        handle_service_exception(e)


def iot_hub_configuration_delete(
    cmd,
    config_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_hub_configuration_delete(target, config_id, etag)


def _iot_hub_configuration_delete(target, config_id, etag=None):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        service_sdk.configuration.delete(id=config_id, custom_headers=headers)
    except CloudError as e:
        handle_service_exception(e)


def iot_edge_deployment_metric_show(
    cmd,
    config_id,
    metric_id,
    metric_type="user",
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    return iot_hub_configuration_metric_show(
        cmd,
        config_id=config_id,
        metric_id=metric_id,
        metric_type=metric_type,
        hub_name_or_hostname=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type_dataplane=auth_type_dataplane,
    )


def iot_hub_configuration_metric_show(
    cmd,
    config_id,
    metric_id,
    metric_type="user",
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        config = _iot_hub_configuration_show(target=target, config_id=config_id)

        metric_collection = None
        if metric_type == "system":
            metric_collection = config["systemMetrics"].get("queries")
        else:
            metric_collection = config["metrics"].get("queries")

        if metric_id not in metric_collection:
            raise InvalidArgumentValueError(
                "The {} metric '{}' is not defined in the configuration '{}'".format(
                    metric_type, metric_id, config_id
                )
            )

        metric_query = metric_collection[metric_id]

        query_args = [metric_query]
        query_method = service_sdk.query.get_twins

        metric_result = _execute_query(query_args, query_method, None)

        output = {}
        output["metric"] = metric_id
        output["query"] = metric_query
        output["result"] = metric_result

        return output
    except CloudError as e:
        handle_service_exception(e)


# Device Twin


def iot_device_twin_show(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_twin_show(target=target, device_id=device_id)


def _iot_device_twin_show(target, device_id):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        return service_sdk.devices.get_twin(id=device_id, raw=True).response.json()
    except CloudError as e:
        handle_service_exception(e)


def iot_device_twin_list(
    cmd,
    hub_name_or_hostname=None,
    top=1000,
    edge_enabled=False,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_twin_list(target, edge_enabled, top)


def _iot_device_twin_list(target, edge_enabled=False, top=1000):
    query = (
        "select * from devices where capabilities.iotEdge = true"
        if edge_enabled
        else "select * from devices"
    )
    result = _iot_query(target=target, query_command=query, top=top)

    if not result:
        hub_name_or_hostname = target["name"]
        logger.info('No registered devices found on hub "%s".', hub_name_or_hostname)
    return result


def iot_twin_update_custom(instance, desired=None, tags=None):
    payload = {}
    is_patch = False
    if desired:
        is_patch = True
        payload["properties"] = {"desired": process_json_arg(desired, "desired")}

    if tags:
        is_patch = True
        payload["tags"] = process_json_arg(tags, "tags")

    return payload if is_patch else instance


def iot_device_twin_update(
    cmd,
    device_id,
    parameters,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_twin_update(target, device_id, parameters, etag)


def _iot_device_twin_update(
    target,
    device_id,
    parameters,
    etag=None,
):
    from azext_iot.common.utility import verify_transform

    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        verify = {}
        if parameters.get("properties"):
            if parameters["properties"].get("desired"):
                verify = {"properties.desired": dict}
        if parameters.get("tags"):
            verify["tags"] = dict
        verify_transform(parameters, verify)
        return service_sdk.devices.update_twin(
            id=device_id, device_twin_info=parameters, custom_headers=headers
        )
    except CloudError as e:
        handle_service_exception(e)
    except (AttributeError, TypeError) as err:
        raise CLIInternalError(err)


def iot_device_twin_replace(
    cmd,
    device_id,
    target_json,
    hub_name_or_hostname=None,
    resource_group_name=None,
    login=None,
    etag=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    return _iot_device_twin_replace(target, device_id, target_json, etag)


def _iot_device_twin_replace(target, device_id, target_json, etag=None):
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    try:
        target_json = process_json_arg(target_json, argument_name="json")
        headers = {}
        headers["If-Match"] = '"{}"'.format(etag if etag else "*")
        return service_sdk.devices.replace_twin(
            id=device_id, device_twin_info=target_json, custom_headers=headers
        )
    except CloudError as e:
        handle_service_exception(e)


def iot_device_method(
    cmd,
    device_id,
    method_name,
    hub_name_or_hostname=None,
    method_payload="{}",
    timeout=30,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    from azext_iot.constants import (
        METHOD_INVOKE_MAX_TIMEOUT_SEC,
        METHOD_INVOKE_MIN_TIMEOUT_SEC,
    )

    if timeout > METHOD_INVOKE_MAX_TIMEOUT_SEC:
        raise InvalidArgumentValueError(
            "timeout must not be over {} seconds".format(METHOD_INVOKE_MAX_TIMEOUT_SEC)
        )
    if timeout < METHOD_INVOKE_MIN_TIMEOUT_SEC:
        raise InvalidArgumentValueError(
            "timeout must be at least {} seconds".format(METHOD_INVOKE_MIN_TIMEOUT_SEC)
        )

    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    # Prevent msrest locking up shell
    service_sdk.config.retry_policy.retries = 1
    try:
        if method_payload:
            method_payload = process_json_arg(
                method_payload, argument_name="method-payload"
            )

        request_body = {
            "methodName": method_name,
            "payload": method_payload,
            "responseTimeoutInSeconds": timeout,
            "connectTimeoutInSeconds": timeout,
        }

        return service_sdk.devices.invoke_method(
            device_id=device_id,
            direct_method_request=request_body,
            timeout=timeout,
        )
    except CloudError as e:
        handle_service_exception(e)


# Device Module Method Invoke


def iot_device_module_method(
    cmd,
    device_id,
    module_id,
    method_name,
    hub_name_or_hostname=None,
    method_payload="{}",
    timeout=30,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    from azext_iot.constants import (
        METHOD_INVOKE_MAX_TIMEOUT_SEC,
        METHOD_INVOKE_MIN_TIMEOUT_SEC,
    )

    if timeout > METHOD_INVOKE_MAX_TIMEOUT_SEC:
        raise InvalidArgumentValueError(
            "timeout must not be over {} seconds".format(METHOD_INVOKE_MAX_TIMEOUT_SEC)
        )
    if timeout < METHOD_INVOKE_MIN_TIMEOUT_SEC:
        raise InvalidArgumentValueError(
            "timeout must not be over {} seconds".format(METHOD_INVOKE_MIN_TIMEOUT_SEC)
        )

    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    resolver = SdkResolver(target=target)
    service_sdk = resolver.get_sdk(SdkType.service_sdk)

    # Prevent msrest locking up shell
    service_sdk.config.retry_policy.retries = 1
    try:
        if method_payload:
            method_payload = process_json_arg(
                method_payload, argument_name="method-payload"
            )

        request_body = {
            "methodName": method_name,
            "payload": method_payload,
            "responseTimeoutInSeconds": timeout,
            "connectTimeoutInSeconds": timeout,
        }

        return service_sdk.modules.invoke_method(
            device_id=device_id,
            module_id=module_id,
            direct_method_request=request_body,
            timeout=timeout,
        )
    except CloudError as e:
        handle_service_exception(e)


# Utility


def iot_get_sas_token(
    cmd,
    hub_name_or_hostname=None,
    device_id=None,
    policy_name="iothubowner",
    key_type="primary",
    duration=3600,
    resource_group_name=None,
    login=None,
    module_id=None,
    auth_type_dataplane=None,
    connection_string=None,
):
    key_type = key_type.lower()
    policy_name = policy_name.lower()

    if login and policy_name != "iothubowner":
        raise ArgumentUsageError(
            "You are unable to change the sas policy with a hub connection string login."
        )
    if login and key_type != "primary" and not device_id:
        raise ArgumentUsageError(
            "For non-device sas, you are unable to change the key type with a connection string login."
        )
    if module_id and not device_id:
        raise ArgumentUsageError(
            "You are unable to get sas token for module without device information."
        )

    if connection_string:
        return {
            DeviceAuthApiType.sas.value: _iot_build_sas_token_from_cs(
                connection_string,
                duration,
            ).generate_sas_token()
        }

    return {
        DeviceAuthApiType.sas.value: _iot_build_sas_token(
            cmd,
            hub_name_or_hostname,
            device_id,
            module_id,
            policy_name,
            key_type,
            duration,
            resource_group_name,
            login,
            auth_type_dataplane,
        ).generate_sas_token()
    }


def _iot_build_sas_token_from_cs(connection_string, duration=3600):
    uri = None
    policy = None
    key = None

    parsed_cs = None
    all_parsers = [
        ConnectionStringParser.Module,
        ConnectionStringParser.Device,
        ConnectionStringParser.IotHub,
    ]

    for parser in all_parsers:
        try:
            parsed_cs = parser(connection_string)

            if "SharedAccessKeyName" in parsed_cs:
                policy = parsed_cs["SharedAccessKeyName"]
            key = parsed_cs["SharedAccessKey"]

            if parser == ConnectionStringParser.IotHub:
                uri = parsed_cs["HostName"]
            elif parser == ConnectionStringParser.Module:
                uri = "{}/devices/{}/modules/{}".format(
                    parsed_cs["HostName"], parsed_cs["DeviceId"], parsed_cs["ModuleId"]
                )
            elif parser == ConnectionStringParser.Device:
                uri = "{}/devices/{}".format(parsed_cs["HostName"], parsed_cs["DeviceId"])
            else:
                raise InvalidArgumentValueError("Given Connection String was not in a supported format.")

            return SasTokenAuthentication(uri, policy, key, duration)
        except ValueError:
            continue

    raise InvalidArgumentValueError("Given Connection String was not in a supported format.")


def _iot_build_sas_token(
    cmd,
    hub_name_or_hostname=None,
    device_id=None,
    module_id=None,
    policy_name="iothubowner",
    key_type="primary",
    duration=3600,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    from azext_iot.common._azure import (
        parse_iot_device_connection_string,
        parse_iot_device_module_connection_string,
    )

    # There is no dataplane operation for a pure IoT Hub sas token
    if all([device_id is None, module_id is None]):
        auth_type_dataplane = "key"

    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        policy_name=policy_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    uri = None
    policy = None
    key = None

    if device_id:
        logger.info(
            'Obtaining device "%s" details from registry, using IoT Hub policy "%s"',
            device_id,
            policy_name,
        )
        device = _iot_device_show(target, device_id)
        if module_id:
            module = _iot_device_module_show(target, device_id, module_id)
            module_cs = _build_device_or_module_connection_string(
                entity=module, key_type=key_type
            )
            uri = "{}/devices/{}/modules/{}".format(
                target["entity"], device_id, module_id
            )
            try:
                parsed_module_cs = parse_iot_device_module_connection_string(module_cs)
            except ValueError as e:
                logger.debug(e)
                raise CLIInternalError("This module does not support SAS auth.")

            key = parsed_module_cs["SharedAccessKey"]
        else:
            device_cs = _build_device_or_module_connection_string(
                entity=device, key_type=key_type
            )
            uri = "{}/devices/{}".format(target["entity"], device_id)
            try:
                parsed_device_cs = parse_iot_device_connection_string(device_cs)
            except ValueError as e:
                logger.debug(e)
                raise CLIInternalError("This device does not support SAS auth.")

            key = parsed_device_cs["SharedAccessKey"]
    else:
        uri = target["entity"]
        policy = target["policy"]
        key = target["primarykey"] if key_type == "primary" else target["secondarykey"]

    return SasTokenAuthentication(uri, policy, key, duration)


def _build_device_or_module_connection_string(entity, key_type="primary"):
    is_device = entity.get("moduleId") is None
    template = (
        "HostName={};DeviceId={};{}"
        if is_device
        else "HostName={};DeviceId={};ModuleId={};{}"
    )
    auth = entity["authentication"]
    auth_type = auth["type"].lower()
    if auth_type == DeviceAuthApiType.sas.value.lower():
        key = "SharedAccessKey={}".format(
            auth["symmetricKey"]["primaryKey"]
            if key_type == "primary"
            else auth["symmetricKey"]["secondaryKey"]
        )
    elif auth_type in [
        DeviceAuthApiType.certificateAuthority.value.lower(),
        DeviceAuthApiType.selfSigned.value.lower(),
    ]:
        key = "x509=true"
    else:
        raise CLIInternalError("Unable to form target connection string")

    if is_device:
        return template.format(entity.get("hub"), entity.get("deviceId"), key)
    else:
        return template.format(
            entity.get("hub"), entity.get("deviceId"), entity.get("moduleId"), key
        )


def iot_get_device_connection_string(
    cmd,
    device_id,
    hub_name_or_hostname=None,
    key_type="primary",
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    result = {}
    device = iot_device_show(
        cmd,
        device_id,
        hub_name_or_hostname=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type_dataplane=auth_type_dataplane,
    )
    result["connectionString"] = _build_device_or_module_connection_string(
        device, key_type
    )
    return result


def iot_get_module_connection_string(
    cmd,
    device_id,
    module_id,
    hub_name_or_hostname=None,
    key_type="primary",
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    result = {}
    module = iot_device_module_show(
        cmd,
        device_id,
        module_id,
        resource_group_name=resource_group_name,
        hub_name_or_hostname=hub_name_or_hostname,
        login=login,
        auth_type_dataplane=auth_type_dataplane,
    )
    result["connectionString"] = _build_device_or_module_connection_string(
        module, key_type
    )
    return result


def _get_service_sdk(
    cmd,
    hub_name_or_hostname: str,
    resource_group_name: str = None,
    login=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )
    resolver = SdkResolver(target=target)
    return resolver.get_sdk(SdkType.service_sdk)


def _generate_blob_container_uri(
    cmd,
    storage_account_name: str,
    blob_container_name: str,
    identity: str = None,
):
    from azext_iot.common.embedded_cli import EmbeddedCLI
    if blob_container_name is None or storage_account_name is None:
        raise ClientRequestError(
            "Storage account and Blob container names are required to generate blob container uri"
        )

    cli = EmbeddedCLI(cli_ctx=cmd.cli_ctx)
    storage_endpoint = cli.invoke(
        "storage account show -n '{}'".format(
            storage_account_name
        )
    ).as_json()["primaryEndpoints"]["blob"]

    container_sas_url = storage_endpoint + blob_container_name

    if not identity:
        storage_cstring = cli.invoke(
            "storage account show-connection-string -n '{}'".format(
                storage_account_name
            )
        ).as_json()["connectionString"]
        sas_token = generate_storage_account_sas_token(storage_cstring, read=True, write=True, create=True, add=True, delete=True)
        container_sas_url = container_sas_url + "?" + sas_token

    return container_sas_url


def _create_export_import_job_properties(
    job_type: str,
    input_blob_container_uri: str = None,
    output_blob_container_uri: str = None,
    include_keys: bool = False,
    identity: str = None,
):
    from azext_iot.common.shared import AuthenticationType
    from azext_iot.sdk.iothub.service.models import (
        JobProperties,
        ManagedIdentity
    )
    job_properties = JobProperties()
    if job_type == JobType.exportDevices.value:
        job_properties.exclude_keys_in_export = not include_keys
    elif job_type == JobType.importDevices.value:
        if exists(input_blob_container_uri):
            input_blob_container_uri = read_file_content(input_blob_container_uri)
        job_properties.input_blob_container_uri = input_blob_container_uri
    else:
        raise ClientRequestError(
            "Invalid job type: {}".format(job_type)
        )
    job_properties.type = job_type

    if exists(output_blob_container_uri):
        output_blob_container_uri = read_file_content(output_blob_container_uri)
    job_properties.output_blob_container_uri = output_blob_container_uri

    if identity is None:
        job_properties.storage_authentication_type = AuthenticationType.keyBased.name
    else:
        job_properties.storage_authentication_type = AuthenticationType.identityBased.name
        if identity != "[system]":
            job_properties.identity = ManagedIdentity(user_assigned_identity=identity)

    return job_properties


def iot_device_export(
    cmd,
    hub_name_or_hostname: str = None,
    blob_container_uri: str = None,
    blob_container_name: str = None,
    storage_account_name: str = None,
    include_keys: bool = False,
    storage_authentication_type: str = None,
    identity: str = None,
    resource_group_name: str = None,
    login=None,
    auth_type_dataplane=None,
):
    if blob_container_uri is None:
        blob_container_uri = _generate_blob_container_uri(
            cmd, storage_account_name, blob_container_name, identity
        )
    if storage_authentication_type is not None:
        logger.warning(
            "The parameter --sat/--storage-authentication-type has been deprecated and should not be provided. "
        )
        logger.warning(
            "The parameter --auth-type is now used to specify IoT Hub data access auth type instead of storage access auth type. "
        )

    service_sdk = _get_service_sdk(
        cmd, hub_name_or_hostname, resource_group_name, login, auth_type_dataplane
    )
    export_job_properties = _create_export_import_job_properties(
        job_type=JobType.exportDevices.value,
        output_blob_container_uri=blob_container_uri,
        include_keys=include_keys,
        identity=identity
    )

    try:
        return service_sdk.jobs.create_import_export_job(export_job_properties)
    except CloudError as e:
        handle_service_exception(e)


def iot_device_import(
    cmd,
    hub_name_or_hostname: str = None,
    input_blob_container_uri: str = None,
    input_blob_container_name: str = None,
    input_storage_account_name: str = None,
    output_blob_container_uri: str = None,
    output_blob_container_name: str = None,
    output_storage_account_name: str = None,
    storage_authentication_type: str = None,
    resource_group_name: str = None,
    identity: str = None,
    login=None,
    auth_type_dataplane=None,
):
    if input_blob_container_uri is None:
        input_blob_container_uri = _generate_blob_container_uri(
            cmd, input_storage_account_name, input_blob_container_name, identity
        )
    if output_blob_container_uri is None:
        output_blob_container_uri = _generate_blob_container_uri(
            cmd, output_storage_account_name, output_blob_container_name, identity
        )
    if storage_authentication_type is not None:
        logger.warning(
            "The parameter --sat/--storage-authentication-type has been deprecated and should not be provided. "
        )
        logger.warning(
            "The parameter --auth-type is now used to specify IoT Hub data access auth type instead of storage access auth type. "
        )

    service_sdk = _get_service_sdk(
        cmd, hub_name_or_hostname, resource_group_name, login, auth_type_dataplane
    )
    import_job_properties = _create_export_import_job_properties(
        job_type=JobType.importDevices.value,
        input_blob_container_uri=input_blob_container_uri,
        output_blob_container_uri=output_blob_container_uri,
        identity=identity
    )

    try:
        return service_sdk.jobs.create_import_export_job(import_job_properties)
    except CloudError as e:
        handle_service_exception(e)


def iot_hub_monitor_events(
    cmd,
    hub_name_or_hostname=None,
    device_id=None,
    interface=None,
    module_id=None,
    consumer_group="$Default",
    timeout=300,
    enqueued_time=None,
    resource_group_name=None,
    yes=False,
    properties=None,
    repair=False,
    login=None,
    content_type=None,
    device_query=None,
    message_count: Optional[int] = None,
):
    try:
        _iot_hub_monitor_events(
            cmd,
            hub_name_or_hostname=hub_name_or_hostname,
            device_id=device_id,
            interface_name=interface,
            module_id=module_id,
            consumer_group=consumer_group,
            timeout=timeout,
            enqueued_time=enqueued_time,
            resource_group_name=resource_group_name,
            yes=yes,
            properties=properties,
            repair=repair,
            login=login,
            content_type=content_type,
            device_query=device_query,
            message_count=message_count,
        )
    except RuntimeError as e:
        raise CLIInternalError(e)


def iot_hub_monitor_feedback(
    cmd,
    hub_name_or_hostname=None,
    device_id=None,
    yes=False,
    wait_on_id=None,
    repair=False,
    resource_group_name=None,
    login=None,
    auth_type_dataplane=None,
):
    from azext_iot.common.deps import ensure_uamqp

    config = cmd.cli_ctx.config
    ensure_uamqp(config, yes, repair)

    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        login=login,
        auth_type=auth_type_dataplane,
    )

    return _iot_hub_monitor_feedback(
        target=target, device_id=device_id, wait_on_id=wait_on_id
    )


def iot_hub_distributed_tracing_show(
    cmd,
    hub_name_or_hostname,
    device_id,
    resource_group_name=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        auth_type=auth_type_dataplane
    )

    device_twin = _iot_hub_distributed_tracing_show(discovery=discovery, target=target, device_id=device_id)
    return _customize_device_tracing_output(
        device_twin["deviceId"],
        device_twin["properties"]["desired"],
        device_twin["properties"]["reported"],
    )


def _iot_hub_monitor_events(
    cmd,
    interface_name=None,
    module_id=None,
    hub_name_or_hostname=None,
    device_id=None,
    consumer_group="$Default",
    timeout=300,
    enqueued_time=None,
    resource_group_name=None,
    yes=False,
    properties=None,
    repair=False,
    login=None,
    content_type=None,
    device_query=None,
    message_count: Optional[int] = None,
):
    (enqueued_time, properties, timeout, output, message_count) = init_monitoring(
        cmd, timeout, properties, enqueued_time, repair, yes, message_count
    )

    device_ids = {}
    if device_query:
        devices_result = iot_query(
            cmd, device_query, hub_name_or_hostname, None, resource_group_name, login=login
        )
        if devices_result:
            for device_result in devices_result:
                device_ids[device_result["deviceId"]] = True

    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        include_events=True,
        login=login,
    )

    from azext_iot.monitor.builders import hub_target_builder
    from azext_iot.monitor.handlers import CommonHandler
    from azext_iot.monitor.telemetry import start_single_monitor
    from azext_iot.monitor.utility import generate_on_start_string
    from azext_iot.monitor.models.arguments import (
        CommonParserArguments,
        CommonHandlerArguments,
    )

    target = hub_target_builder.EventTargetBuilder().build_iot_hub_target(target)
    target.add_consumer_group(consumer_group)

    on_start_string = generate_on_start_string(device_id=device_id)

    parser_args = CommonParserArguments(
        properties=properties, content_type=content_type
    )
    handler_args = CommonHandlerArguments(
        output=output,
        common_parser_args=parser_args,
        devices=device_ids,
        device_id=device_id,
        interface_name=interface_name,
        module_id=module_id,
        max_messages=message_count,
    )

    handler = CommonHandler(handler_args)

    start_single_monitor(
        target=target,
        enqueued_time_utc=enqueued_time,
        on_start_string=on_start_string,
        on_message_received=handler.parse_message,
        timeout=timeout,
    )


def iot_hub_distributed_tracing_update(
    cmd,
    hub_name_or_hostname,
    device_id,
    sampling_mode,
    sampling_rate,
    resource_group_name=None,
    auth_type_dataplane=None,
):
    discovery = IotHubDiscovery(cmd)
    target = discovery.get_target(
        resource_name=hub_name_or_hostname,
        resource_group_name=resource_group_name,
        include_events=True,
        auth_type=auth_type_dataplane,
    )

    if int(sampling_rate) not in range(0, 101):
        raise InvalidArgumentValueError(
            "Sampling rate is a percentage, So only values from 0 to 100(inclusive) are permitted."
        )
    device_twin = _iot_hub_distributed_tracing_show(discovery=discovery, target=target, device_id=device_id)
    if TRACING_PROPERTY not in device_twin["properties"]["desired"]:
        device_twin["properties"]["desired"][TRACING_PROPERTY] = {}
    device_twin["properties"]["desired"][TRACING_PROPERTY]["sampling_rate"] = int(
        sampling_rate
    )
    device_twin["properties"]["desired"][TRACING_PROPERTY]["sampling_mode"] = (
        1 if sampling_mode.lower() == "on" else 2
    )
    result = iot_device_twin_update(
        cmd, device_id, device_twin, hub_name_or_hostname, resource_group_name
    )
    return _customize_device_tracing_output(
        result.device_id, result.properties.desired, result.properties.reported
    )


def iot_hub_connection_string_show(
    cmd,
    hub_name_or_hostname=None,
    resource_group_name=None,
    policy_name="iothubowner",
    key_type=KeyType.primary.value,
    show_all=False,
    default_eventhub=False,
):
    discovery = IotHubDiscovery(cmd)

    if hub_name_or_hostname is None:
        hubs = discovery.get_resources(resource_group_name)
        if hubs is None:
            raise ResourceNotFoundError("No IoT Hub found.")

        def conn_str_getter(hub):
            return _get_hub_connection_string(
                discovery, hub, policy_name, key_type, show_all, default_eventhub
            )

        connection_strings = []
        for hub in hubs:
            if hub.properties.state == IoTHubStateType.Active.value:
                try:
                    connection_strings.append(
                        {
                            "name": hub.name,
                            "connectionString": conn_str_getter(hub)
                            if show_all
                            else conn_str_getter(hub)[0],
                        }
                    )
                except Exception:
                    logger.warning(
                        f"Warning: The IoT Hub {hub.name} in resource group "
                        + f"{hub.additional_properties['resourcegroup']} does "
                        + f"not have the target policy {policy_name}."
                    )
            else:
                logger.warning(
                    f"Warning: The IoT Hub {hub.name} in resource group "
                    + f"{hub.additional_properties['resourcegroup']} is skipped "
                    + "because the hub is not active."
                )
        return connection_strings

    hub = discovery.find_resource(hub_name_or_hostname, resource_group_name)
    if hub:
        conn_str = _get_hub_connection_string(
            discovery, hub, policy_name, key_type, show_all, default_eventhub
        )
        return {"connectionString": conn_str if show_all else conn_str[0]}


def _get_hub_connection_string(
    discovery, hub, policy_name, key_type, show_all, default_eventhub
):

    policies = []
    if show_all:
        policies.extend(
            discovery.get_policies(hub.name, hub.additional_properties["resourcegroup"])
        )
    else:
        policies.append(
            discovery.find_policy(
                hub.name, hub.additional_properties["resourcegroup"], policy_name
            )
        )
    if default_eventhub:
        cs_template_eventhub = (
            "Endpoint={};SharedAccessKeyName={};SharedAccessKey={};EntityPath={}"
        )
        endpoint = hub.properties.event_hub_endpoints["events"].endpoint
        entityPath = hub.properties.event_hub_endpoints["events"].path
        return [
            cs_template_eventhub.format(
                endpoint,
                p.key_name,
                p.secondary_key
                if key_type == KeyType.secondary.value
                else p.primary_key,
                entityPath,
            )
            for p in policies
            if "serviceconnect"
            in (
                p.rights.value.lower()
                if isinstance(p.rights, (Enum, EnumMeta))
                else p.rights.lower()
            )
        ]

    hostname = hub.properties.host_name
    cs_template = "HostName={};SharedAccessKeyName={};SharedAccessKey={}"
    return [
        cs_template.format(
            hostname,
            p.key_name,
            p.secondary_key if key_type == KeyType.secondary.value else p.primary_key,
        )
        for p in policies
    ]


def _iot_hub_monitor_feedback(target, device_id, wait_on_id):
    from azext_iot.monitor import event

    event.monitor_feedback(
        target=target, device_id=device_id, wait_on_id=wait_on_id, token_duration=3600
    )


def _iot_hub_distributed_tracing_show(discovery, target, device_id):
    device_twin = _iot_device_twin_show(target=target, device_id=device_id)
    _validate_device_tracing(discovery, target, device_twin)
    return device_twin


def _validate_device_tracing(discovery, target, device_twin):
    if not all([target.get("location"), target.get("sku_tier")]):
        resource = discovery.find_resource(target["name"])
        target["location"] = resource.location
        target["sku_tier"] = resource.sku.tier.value if isinstance(resource.sku.tier, (Enum, EnumMeta)) else resource.sku.tier
    if target["location"].lower() not in TRACING_ALLOWED_FOR_LOCATION:
        raise ClientRequestError(
            'Distributed tracing isn\'t supported for the hub located at "{}" location.'.format(
                target["location"]
            )
        )
    if target["sku_tier"].lower() != TRACING_ALLOWED_FOR_SKU:
        raise ClientRequestError(
            'Distributed tracing isn\'t supported for the hub belongs to "{}" sku tier.'.format(
                target["sku_tier"]
            )
        )
    if device_twin["capabilities"]["iotEdge"]:
        raise ClientRequestError(
            'The device "{}" should be a non-edge device.'.format(device_twin["deviceId"])
        )


def _customize_device_tracing_output(device_id, desired, reported):
    output = {}
    desired_tracing = desired.get(TRACING_PROPERTY, None)
    if desired_tracing:
        output["deviceId"] = device_id
        output["samplingMode"] = (
            "enabled" if desired_tracing.get("sampling_mode") == 1 else "disabled"
        )
        output["samplingRate"] = "{}%".format(desired_tracing.get("sampling_rate"))
        output["isSynced"] = False
        reported_tracing = reported.get(TRACING_PROPERTY, None)
        if (
            reported_tracing
            and desired_tracing.get("sampling_mode")
            == reported_tracing.get("sampling_mode").get("value", None)
            and desired_tracing.get("sampling_rate")
            == reported_tracing.get("sampling_rate").get("value", None)
        ):
            output["isSynced"] = True
    return output
