azext_iot/iothub/providers/helpers/edge_device_config.py (428 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- """This module defines common values and functions for processing edge device configurations""" from pathlib import PurePath from os import getcwd from typing import Optional, List, Dict, Any from azext_iot.common.fileops import write_content_to_file from azext_iot.common.certops import create_self_signed_certificate, load_ca_cert_info from azext_iot.common.shared import ( ConfigType, DeviceAuthType, ) from azext_iot.iothub.common import ( EdgeDevicesConfig, EdgeDeviceConfig, EdgeContainerAuth, ) from azext_iot.common.utility import process_json_arg, process_toml_arg, assemble_nargs_to_dict from azure.cli.core.azclierror import ( CLIInternalError, FileOperationError, InvalidArgumentValueError, RequiredArgumentMissingError, ) from azext_iot.operations.hub import _process_config_content from azext_iot.sdk.iothub.service.models import ConfigurationContent from knack.log import get_logger logger = get_logger(__name__) MAX_DEVICE_SCOPE_RETRIES = 5 DEVICE_CONFIG_SCHEMA_VALID_VERSIONS: Dict[str, Any] = {} DEVICE_CONFIG_SCHEMA_VALID_VERSIONS["1.0"] = { "type": "object", "required": ["configVersion", "iotHub", "edgeConfiguration", "edgeDevices"], "properties": { "configVersion": {"type": "string"}, "iotHub": { "type": "object", "required": ["authenticationMethod"], "properties": { "authenticationMethod": { "type": "string", "enum": ["symmetricKey", "x509Certificate"] }, } }, "certificates": { "type": "object", "required": ["rootCACertPath", "rootCACertKeyPath"], "properties": { "rootCACertPath": {"type": "string"}, "rootCACertKeyPath": {"type": "string"} } }, "edgeConfiguration": { "type": "object", "properties": { "templateConfigPath": {"type": "string"}, "defaultEdgeAgent": {"type": "string"} }, "required": ["defaultEdgeAgent"] }, "edgeDevices": { "type": "array", "items": {"$ref": "#/$defs/edgeDevice"} }, }, "$defs": { "edgeDevice": { "type": "object", "properties": { "deviceId": {"type": "string"}, "hostname": {"type": "string"}, "edgeAgent": {"type": "string"}, "deployment": {"type": "string"}, "containerAuth": { "type": "object", "properties": { "serverAddress": {"type": "string"}, "username": {"type": "string"}, "password": {"type": "string"} } }, "children": { "type": "array", "items": {"$ref": "#/$defs/edgeDevice"}, "minItems": 1, } }, "required": ["deviceId"] } } } # Edge device TOML default values DEVICE_CONFIG_TOML = { "auto_reprovisioning_mode": "Dynamic", "hostname": "", "provisioning": { "device_id": "", "iothub_hostname": "", "source": "manual", "authentication": { "device_id_pk": "", "method": "sas", "trust_bundle_cert": "", }, }, "edge_ca": {"cert": "file:///", "pk": "file:///"}, "agent": {"config": {"image": ""}, "name": "edgeAgent", "type": "docker"}, "connect": { "management_uri": "unix:///var/run/iotedge/mgmt.sock", "workload_uri": "unix:///var/run/iotedge/workload.sock", }, "listen": { "management_uri": "fd://aziot-edged.mgmt.socket", "workload_uri": "fd://aziot-edged.workload.socket", }, "moby_runtime": {"network": "azure-iot-edge", "uri": "unix:///var/run/docker.sock"}, } EDGE_ROOT_CERTIFICATE_FILENAME = "iotedge_config_cli_root.pem" EDGE_CONFIG_SCRIPT_HEADERS = """ # This script will attempt to configure a pre-installed iotedge as a nested node. # It must be run as sudo, and will modify the ca device_id="{}" cp config.toml /etc/aziot/config.toml """ EDGE_CONFIG_SCRIPT_HOSTNAME = """ # ======================= Set Hostname ======================================= read -p "Enter the hostname to use: " hostname if [ -z "$hostname" ] then echo "Invalid hostname $hostname" exit 1 fi sed -i "s/{{HOSTNAME}}/$hostname/" /etc/aziot/config.toml """ EDGE_CONFIG_SCRIPT_PARENT_HOSTNAME = """ # ======================= Set Parent Hostname ======================================= read -p "Enter the parent hostname to use: " parent_hostname if [ -z "$parent_hostname" ] then echo "Invalid parent hostname $parent_hostname" exit 1 fi sed -i "s/{{PARENT_HOSTNAME}}/$parent_hostname/" /etc/aziot/config.toml """ EDGE_CONFIG_SCRIPT_CA_CERTS = f""" # ======================= Install nested root CA ======================================= if [ -f /etc/os-release ] then . /etc/os-release if [[ "$NAME" == "Common Base Linux Mariner"* ]]; then cp {EDGE_ROOT_CERTIFICATE_FILENAME} /etc/pki/ca-trust/source/anchors/{EDGE_ROOT_CERTIFICATE_FILENAME}.crt update-ca-trust else cp {EDGE_ROOT_CERTIFICATE_FILENAME} /usr/local/share/ca-certificates/{EDGE_ROOT_CERTIFICATE_FILENAME}.crt update-ca-certificates fi else cp {EDGE_ROOT_CERTIFICATE_FILENAME} /usr/local/share/ca-certificates/{EDGE_ROOT_CERTIFICATE_FILENAME}.crt update-ca-certificates fi systemctl restart docker # ======================= Copy device certs ======================================= cert_dir="/etc/aziot/certificates" mkdir -p $cert_dir cp "{EDGE_ROOT_CERTIFICATE_FILENAME}" "$cert_dir/{EDGE_ROOT_CERTIFICATE_FILENAME}" cp "$device_id.full-chain.cert.pem" "$cert_dir/$device_id.full-chain.cert.pem" cp "$device_id.key.pem" "$cert_dir/$device_id.key.pem" """ EDGE_CONFIG_SCRIPT_HUB_AUTH_CERTS = """ # ======================= Copy hub auth certs ======================================= cert_dir="/etc/aziot/certificates" mkdir -p $cert_dir cp "$device_id.hub-auth-cert.pem" "$cert_dir/$device_id.hub-auth-cert.pem" cp "$device_id.hub-auth-key.pem" "$cert_dir/$device_id.hub-auth-key.pem" """ EDGE_CONFIG_SCRIPT_APPLY = """ # ======================= Read User Input ======================================= iotedge config apply -c /etc/aziot/config.toml echo "To check the edge runtime status, run 'iotedge system status'. To validate the configuration, run 'sudo iotedge check'" """ EDGE_SUPPORTED_OS_LINK = "https://aka.ms/iotedge-supported-systems" EDGE_LINUX_TUTORIAL_LINK = "https://aka.ms/iotedge-provision-linux-device" EDGE_WINDOWS_TUTORIAL_LINK = "https://aka.ms/iotedge-provision-windows" DEVICE_README = f""" # Prerequisites Each device must have IoT Edge (must be v1.2 or later) installed. Pick a [supported OS]({EDGE_SUPPORTED_OS_LINK}) and follow the corresponding tutorial to install Azure IoT Edge: - [Linux on Windows]({EDGE_WINDOWS_TUTORIAL_LINK}) - [Linux]({EDGE_LINUX_TUTORIAL_LINK}) # Steps 1. Copy the bundle for each created device (device_id.tgz) onto the device. 2. Extract the bundle file by running following command: ```Extract tar zxvf ~/<PATH_TO_BUNDLE>/[[device-id]].tgz ``` 3. Run the install script: ```Run sudo bash ./install.sh ``` 4. If hostnames were not provided in the configuration file, the script will prompt for hostnames. - Follow the prompt by entering the device and/or parent hostname (FQDN or IP address). - On the parent device, it will prompt for its own hostname. - On a child device, it may prompt the hostname of both the child and parent devices. """ EDGE_ROOT_CERTIFICATE_SUBJECT = "Azure_IoT_CLI_Extension_Cert" def create_edge_device_config( hub_hostname: str, device_id: str, auth_method: DeviceAuthType, device_config: EdgeDeviceConfig, default_edge_agent: str, device_config_path: Optional[str] = None, device_pk: Optional[str] = None, output_path: Optional[str] = None, ): # load default device TOML object or custom path device_toml = ( process_toml_arg(device_config_path) if device_config_path else DEVICE_CONFIG_TOML ) device_toml[ "trust_bundle_cert" ] = f"file:///etc/aziot/certificates/{EDGE_ROOT_CERTIFICATE_FILENAME}" # Dynamic is the default auto reprovisioning mode, but respect config settings device_toml["auto_reprovisioning_mode"] = getattr(device_toml, "auto_reprovisioning_mode", "Dynamic") device_toml["hostname"] = ( device_config.hostname if device_config.hostname else "{{HOSTNAME}}" ) if device_config.parent_id: device_toml["parent_hostname"] = ( device_config.parent_hostname if device_config.parent_hostname else "{{PARENT_HOSTNAME}}" ) device_toml["provisioning"] = { "device_id": device_id, "iothub_hostname": hub_hostname, "source": "manual", "authentication": {"device_id_pk": {"value": device_pk}, "method": "sas"} if auth_method == DeviceAuthType.shared_private_key.value else { "method": "x509", "identity_cert": f"file:///etc/aziot/certificates/{device_id}.hub-auth-cert.pem", "identity_pk": f"file:///etc/aziot/certificates/{device_id}.hub-auth-key.pem", }, } device_toml["edge_ca"] = { "cert": f"file:///etc/aziot/certificates/{device_id}.full-chain.cert.pem", "pk": f"file:///etc/aziot/certificates/{device_id}.key.pem", } device_toml["agent"]["config"] = { "image": device_config.edge_agent or default_edge_agent or '', "auth": { "serveraddress": device_config.container_auth.serveraddress, "username": device_config.container_auth.username, "password": device_config.container_auth.password, } if device_config.container_auth else {}, } if output_path: import tomli_w write_content_to_file( tomli_w.dumps(device_toml), output_path, "config.toml", overwrite=True, ) return device_toml def process_edge_devices_config_file_content( content: dict, config_path: Optional[str] = None, override_auth_type: Optional[str] = None, override_root_cert_path: Optional[str] = None, override_root_key_path: Optional[str] = None, override_root_password: Optional[str] = None, override_default_edge_agent: Optional[str] = None, override_device_config_template: Optional[str] = None, ) -> EdgeDevicesConfig: """ Process edge config file schema dictionary """ # Use current directory if no config file path config_path = config_path or getcwd() # Warn about override values for value, name in [ (override_auth_type, "Authentication Type"), (override_root_cert_path, "Root certificate"), (override_root_key_path, "Root certificate key"), (override_default_edge_agent, "Default edge agent"), (override_device_config_template, "Device config template"), ]: if value: logger.info( f"Overriding configuration file property `{name}` " f"with command argument value: `{value}`" ) version = content.get("configVersion", None) if not version: raise InvalidArgumentValueError("'configVersion' property missing from device configuration file.") from jsonschema import validate from jsonschema.exceptions import ValidationError try: validate(content, DEVICE_CONFIG_SCHEMA_VALID_VERSIONS[version]) except ValidationError as err: raise InvalidArgumentValueError(f"Invalid devices config file schema:\n{err.message}") hub_config = content.get("iotHub", {}) devices_config = content.get("edgeDevices", []) # edge root CA root_cert = None certificates = content.get("certificates", None) if certificates or any( [override_root_cert_path, override_root_key_path, override_root_password] ): root_ca_cert = override_root_cert_path or certificates.get( "rootCACertPath", None ) root_ca_key = override_root_key_path or certificates.get( "rootCACertKeyPath", None ) if not all([root_ca_cert, root_ca_key]): raise InvalidArgumentValueError( "Please check your config file to ensure values are provided " "for both `rootCACertPath` and `rootCACertKeyPath`." ) root_cert = load_ca_cert_info( root_ca_cert, root_ca_key, password=override_root_password ) else: root_cert = create_self_signed_certificate( subject=EDGE_ROOT_CERTIFICATE_SUBJECT, key_size=4096, v3_extensions=True ) # device auth # default to symmetric key device_authentication_method = DeviceAuthType.shared_private_key.value auth_value = hub_config.get("authenticationMethod", None) if override_auth_type: device_authentication_method = override_auth_type else: device_authentication_method = ( DeviceAuthType.x509_thumbprint.value if auth_value == "x509Certificate" else DeviceAuthType.shared_private_key.value ) # edge config edge_config = content.get("edgeConfiguration", None) if edge_config or any( [override_default_edge_agent, override_device_config_template] ): # do not use path relative to config file if overridden from CLI context if override_device_config_template: template_config_path = override_device_config_template else: template_config_path = edge_config.get("templateConfigPath", None) if template_config_path: # relative path to config file to device.toml template_config_path = PurePath(config_path, template_config_path).as_posix() default_edge_agent = override_default_edge_agent or edge_config.get( "defaultEdgeAgent", None ) all_devices = [] def _process_edge_config_device(device: dict, parent_id=None, parent_hostname=None): device_id = device.get("deviceId", None) if not device_id: raise InvalidArgumentValueError( "A device parameter is missing required attribute 'device_id'" ) deployment = device.get("deployment", None) if deployment: # relative path from config file to deployment.json deployment = PurePath(config_path, deployment).as_posix() deployment = try_parse_valid_deployment_config(deployment) child_devices = device.get("children", []) container_auth = device.get("containerAuth", {}) hostname = device.get("hostname", None) edge_agent = device.get("edgeAgent", None) device_config = EdgeDeviceConfig( device_id=device_id, deployment=deployment, parent_id=parent_id, parent_hostname=parent_hostname, container_auth=EdgeContainerAuth( serveraddress=container_auth.get("serverAddress", None), username=container_auth.get("username", None), password=container_auth.get("password", None), ) if container_auth else None, hostname=hostname, edge_agent=edge_agent, ) all_devices.append(device_config) for child_device in child_devices: _process_edge_config_device( child_device, parent_id=device_id, parent_hostname=hostname ) for device in devices_config: _process_edge_config_device(device) return EdgeDevicesConfig( version=version, auth_method=device_authentication_method, root_cert=root_cert, devices=all_devices, template_config_path=template_config_path, default_edge_agent=default_edge_agent, ) def create_edge_device_config_script( device_id: str, hub_auth: bool = False, hostname: Optional[str] = None, has_parent: bool = False, parent_hostname: Optional[str] = None, ): return "\n".join( [EDGE_CONFIG_SCRIPT_HEADERS.format(device_id)] + ([EDGE_CONFIG_SCRIPT_HOSTNAME] if not hostname else []) + ( [EDGE_CONFIG_SCRIPT_PARENT_HOSTNAME] if (has_parent and not parent_hostname) else [] ) + [EDGE_CONFIG_SCRIPT_CA_CERTS] + ([EDGE_CONFIG_SCRIPT_HUB_AUTH_CERTS] if hub_auth else []) + [EDGE_CONFIG_SCRIPT_APPLY] ) def try_parse_valid_deployment_config(deployment_path: str): try: deployment_content = process_json_arg( deployment_path, argument_name="deployment" ) processed_content = _process_config_content( deployment_content, config_type=ConfigType.edge ) return ConfigurationContent(**processed_content) except CLIInternalError: raise FileOperationError( f"Please ensure a deployment file exists at path: '{deployment_path}'" ) except Exception as ex: logger.warning(f"Error processing config file at '{deployment_path}'") raise InvalidArgumentValueError(ex) def process_edge_devices_config_args( device_args: List[List[str]], auth_type: str, default_edge_agent: Optional[str] = None, device_config_template: Optional[str] = None, root_cert_path: Optional[str] = None, root_key_path: Optional[str] = None, root_cert_password: Optional[str] = None, ) -> EdgeDevicesConfig: # raise error if only key or cert provided if (root_cert_path is not None) ^ (root_key_path is not None): raise RequiredArgumentMissingError( "You must provide a path to both the root cert public and private keys." ) # create cert if one isn't provided root_cert = ( load_ca_cert_info(root_cert_path, root_key_path, root_cert_password) if all([root_cert_path, root_key_path]) else create_self_signed_certificate( subject=EDGE_ROOT_CERTIFICATE_SUBJECT, key_size=4096, v3_extensions=True ) ) config = EdgeDevicesConfig( version="1.0", auth_method=(auth_type or DeviceAuthType.shared_private_key.value), default_edge_agent=default_edge_agent, template_config_path=device_config_template, devices=[], root_cert=root_cert, ) # Process --device arguments all_devices: Dict[str, Dict[str, str]] = {} for device_input in device_args: # assemble device params from nArgs strings device_dict = assemble_nargs_to_dict(device_input) device_id = device_dict.get("id", None) if not device_id: raise InvalidArgumentValueError( "A device argument is missing required parameter 'id'" ) if all_devices.get(device_id, None): raise InvalidArgumentValueError( f"Duplicate deviceId '{device_id}' detected" ) all_devices[device_id] = device_dict for device_id in all_devices: device_dict = all_devices[device_id] deployment = device_dict.get("deployment", None) if deployment: deployment = try_parse_valid_deployment_config(deployment) parent_id = device_dict.get("parent", None) parent_hostname = None if parent_id: parent = all_devices.get(parent_id, {}) parent_hostname = parent.get("hostname", None) hostname = device_dict.get("hostname", None) edge_agent = device_dict.get("edge_agent", None) container_auth_arg = device_dict.get("container_auth", "{}") container_auth_obj = process_json_arg(container_auth_arg) container_auth = ( EdgeContainerAuth( serveraddress=container_auth_obj.get("serverAddress", None), username=container_auth_obj.get("username", None), password=container_auth_obj.get("password", None), ) if container_auth_obj else None ) device_config = EdgeDeviceConfig( device_id=device_id, deployment=deployment, parent_id=parent_id, hostname=hostname, parent_hostname=parent_hostname, edge_agent=edge_agent, container_auth=container_auth, ) config.devices.append(device_config) return config