in azext_iot/iothub/providers/state.py [0:0]
def process_hub_to_dict(self, target: Dict[str, str], hub_aspects: List[str]) -> dict:
"""Returns a dictionary containing the hub state
Full structure:
{
"arm": full_arm_template,
"configurations": {
"admConfigurations": {
"config_id": { config_properties }
},
"edgeDeployments": {
"config_id": { config_properties }
}
}
"devices": {
"device_id": {
"identity": { identity_properties (and properties shared with twin) },
"twin" : { twin_properties },
"parent" : parent_id,
"modules" : {
"module_id" : {
"identity": { identity_properties },
"twin": { twin_properties }
}
}
}
}
}
"""
hub_state = {}
if HubAspects.Configurations.value in hub_aspects:
hub_aspects.remove(HubAspects.Configurations.value)
# Basic tier does not support list config
try:
all_configs = _iot_hub_configuration_list(target=target)
hub_state["configurations"] = {}
adm_configs = {}
for c in tqdm(all_configs, desc=usr_msgs.SAVE_CONFIGURATIONS_DESC, ascii=" #"):
if c["content"].get("deviceContent") or c["content"].get("moduleContent"):
for key in ["createdTimeUtc", "etag", "lastUpdatedTimeUtc", "schemaVersion"]:
c.pop(key, None)
adm_configs[c["id"]] = c
hub_state["configurations"]["admConfigurations"] = adm_configs
hub_state["configurations"]["edgeDeployments"] = {
c["id"]: c for c in all_configs if c["content"].get("modulesContent")
}
except AzCLIError:
logger.warning(usr_msgs.SAVE_CONFIGURATIONS_RETRIEVE_FAIL_MSG)
if HubAspects.Devices.value in hub_aspects:
hub_aspects.remove(HubAspects.Devices.value)
devices = self.download_devices(target=target)
if devices:
hub_state["devices"] = devices
# Controlplane using ARM
if HubAspects.Arm.value in hub_aspects:
hub_name = target.get("entity").split(".")[0]
hub_rg = target.get("resourcegroup")
control_plane_obj = self.discovery.find_resource(hub_name, hub_rg)
if not hub_rg:
hub_rg = control_plane_obj.additional_properties["resourcegroup"]
hub_resource_id = control_plane_obj.id
hub_arm = cli.invoke(f"group export -n {hub_rg} --resource-ids '{hub_resource_id}' --skip-all-params").as_json()
hub_state["arm"] = hub_arm
hub_resource = hub_state["arm"]["resources"][0]
self.check_controlplane(hub_resource=hub_resource)
print(usr_msgs.SAVE_ARM_DESC)
return hub_state