in azext_iot/iothub/providers/state.py [0:0]
def upload_hub_from_dict(self, hub_state: dict, hub_aspects: List[str]):
# Control plane
if HubAspects.Arm.value in hub_aspects and hub_state.get("arm"):
hub_aspects.remove(HubAspects.Arm.value)
hub_resources = []
hub_resource = hub_state["arm"]["resources"][0]
hub_resource["name"] = self.hub_name
if self.target:
# remove/overwrite attributes that cannot be changed
current_hub_resource = self.discovery.find_resource(self.hub_name, self.rg)
if not self.rg:
self.rg = current_hub_resource.additional_properties["resourcegroup"]
# location
hub_resource["location"] = current_hub_resource.location
# sku
hub_resource["sku"] = current_hub_resource.sku.serialize()
# event hub partitions
partition_count = current_hub_resource.properties.event_hub_endpoints["events"].partition_count
hub_resource["properties"]["eventHubEndpoints"]["events"]["partitionCount"] = partition_count
# enable data residency
if (
hasattr(current_hub_resource.properties, "enable_data_residency")
and "enableDataResidency" in hub_resource["properties"]
):
hub_resource["properties"]["enableDataResidency"] = current_hub_resource.properties.enable_data_residency
# features - hub takes care of this but we will do this just incase
hub_resource["properties"]["features"] = current_hub_resource.properties.features
# TODO check for other props and add them as they pop up
else:
# If there is a system assigned identity endpoint, the ARM deployment may stall rather than failing
# outright. So warn and fail before deployment.
identity_endpoints = []
for endpoint_list in hub_resource["properties"]["routing"]["endpoints"].values():
for endpoint in endpoint_list:
if (
endpoint["authenticationType"] == AuthenticationType.IdentityBased.value
and not endpoint.get("identity")
):
identity_endpoints.append(endpoint["name"])
if len(identity_endpoints) > 0:
raise BadRequestError(
usr_msgs.FAILED_ARM_IDENTITY_ENDPOINT_MSG.format(self.hub_name, identity_endpoints)
)
hub_resources.append(hub_resource)
hub_certs = [res for res in hub_state["arm"]["resources"][1:] if res["type"].endswith("certificates")]
if len(hub_certs) < len(hub_state["arm"]["resources"]) - 1:
logger.warning(usr_msgs.PRIVATE_ENDPOINT_WARNING_MSG)
for res in hub_certs:
res["name"] = self.hub_name + "/" + res["name"].split("/")[1]
depends_on = res["dependsOn"][0].split("'")
depends_on[3] = self.hub_name
res["dependsOn"][0] = "'".join(depends_on)
hub_resources.extend(hub_certs)
hub_state["arm"]["resources"] = hub_resources
state_file = f"arm_deployment-{self.hub_name}.json"
with open(state_file, "w", encoding='utf-8') as f:
json.dump(hub_state["arm"], f)
print(f"Starting Arm Deployment for IoT Hub {self.hub_name}.")
arm_result = cli.invoke(
f"deployment group create --template-file {state_file} -g {self.rg}"
)
os.remove(state_file)
if not arm_result.success():
raise BadRequestError(usr_msgs.FAILED_ARM_MSG.format(self.hub_name))
if not self.target:
self.target = self.discovery.get_target(
hub_resource["name"],
resource_group_name=arm_result.as_json()["resourceGroup"]
)
print(usr_msgs.CREATE_IOT_HUB_MSG.format(self.hub_name))
else:
print(usr_msgs.UPDATED_IOT_HUB_MSG.format(self.hub_name))
# block if the arm aspect is specified, the state file does not have the arm aspect, and the
# hub does not exist
if not self.target:
raise BadRequestError(usr_msgs.HUB_NOT_CREATED_MSG.format(self.hub_name))
# Data plane
# upload configurations
if HubAspects.Configurations.value in hub_aspects and hub_state.get("configurations"):
hub_aspects.remove(HubAspects.Configurations.value)
configs = hub_state["configurations"]["admConfigurations"]
edge_deployments = hub_state["configurations"]["edgeDeployments"]
config_progress = tqdm(
total=len(configs) + len(edge_deployments),
desc=usr_msgs.UPLOAD_CONFIGURATIONS_DESC,
ascii=" #"
)
for config_id, config_obj in configs.items():
try:
_iot_hub_configuration_create(
target=self.target,
config_id=config_id,
content=json.dumps(config_obj["content"]),
target_condition=config_obj["targetCondition"],
priority=config_obj["priority"],
labels=json.dumps(config_obj["labels"]),
metrics=json.dumps(config_obj["metrics"])
)
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_ADM_CONFIG_ERROR_MSG.format(config_id, e))
config_progress.update(1)
layered_configs = {}
for config_id, config_obj in edge_deployments.items():
if "properties.desired" not in config_obj["content"]["modulesContent"]["$edgeAgent"]:
config_type = ConfigType.layered
layered_configs[config_id] = config_obj
else:
config_type = ConfigType.edge
try:
_iot_hub_configuration_create(
target=self.target,
config_id=config_id,
content=json.dumps(config_obj["content"]),
target_condition=config_obj["targetCondition"],
priority=config_obj["priority"],
labels=json.dumps(config_obj["labels"]),
metrics=json.dumps(config_obj["metrics"]),
config_type=config_type
)
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_EDGE_DEPLOYMENT_ERROR_MSG.format(config_id, e))
config_progress.update(1)
# Do layered configs after edge configs.
# TODO: create an algo to figure out order
for config_id, config_obj in layered_configs.items():
try:
_iot_hub_configuration_create(
target=self.target,
config_id=config_id,
content=json.dumps(config_obj["content"]),
target_condition=config_obj["targetCondition"],
priority=config_obj["priority"],
labels=json.dumps(config_obj["labels"]),
metrics=json.dumps(config_obj["metrics"]),
config_type=config_type
)
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_EDGE_DEPLOYMENT_ERROR_MSG.format(config_id, e))
config_progress.update(1)
# Devices
if HubAspects.Devices.value in hub_aspects and hub_state.get("devices"):
hub_aspects.remove(HubAspects.Devices.value)
child_to_parent = {}
for device_id, device_obj in tqdm(hub_state["devices"].items(), desc=usr_msgs.UPLOAD_DEVICE_MSG, ascii=" #"):
# upload device identity and twin
try:
self.upload_device_identity(device_id, device_obj["identity"])
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_DEVICE_IDENTITY_MSG.format(device_id, e))
continue
try:
_iot_device_twin_update(
target=self.target, device_id=device_id, parameters=device_obj["twin"]
)
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_DEVICE_TWIN_MSG.format(device_id, e))
continue
edge_modules = {}
for module_id, module_obj in device_obj.get("modules", {}).items():
# upload will fail for modules that start with $ or have no auth
if module_id.startswith("$") or module_obj["identity"]["authentication"]["type"] == "none":
edge_modules[module_id] = {
"properties.desired": module_obj["twin"]["properties"]["desired"]
}
else:
module_identity = module_obj["identity"]
module_twin = module_obj["twin"]
try:
self.upload_module_identity(device_id, module_id, module_identity)
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_DEVICE_MODULE_IDENTITY_MSG.format(module_id, device_id, e))
continue
try:
_iot_device_module_twin_update(
target=self.target,
device_id=device_id,
module_id=module_id,
parameters=module_twin
)
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_DEVICE_TWIN_MSG.format(module_id, device_id, e))
continue
if edge_modules:
try:
_iot_edge_set_modules(
target=self.target, device_id=device_id, content=json.dumps({"modulesContent": edge_modules})
)
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_EDGE_MODULE_MSG.format(device_id, e))
continue
if device_obj.get("parent"):
child_to_parent[device_id] = device_obj["parent"]
# set parent-child relationships after all devices are created
for device_id in child_to_parent:
try:
_iot_device_set_parent(target=self.target, parent_id=child_to_parent[device_id], device_id=device_id)
except AzCLIError as e:
logger.error(usr_msgs.UPLOAD_DEVICE_RELATIONSHIP_MSG.format(child_to_parent[device_id], device_id, e))
continue
# Leftover aspects
if hub_aspects:
logger.warning(usr_msgs.MISSING_HUB_ASPECTS_MSG.format(', '.join(hub_aspects)))