in azext_iot/iothub/providers/state.py [0:0]
def check_controlplane(self, hub_resource: dict):
"""
Check the controlplane for missing resources and connection strings.
Specifically will fetch connection strings for endpoints and file upload and check for existance of
endpoint resources, file upload resources, and user assigned identity resources.
If cannot retrieve (due to missing permissions or resource), log and remove endpoint, file upload,
or user assigned identity.
If an endpoint is removed, any route associated with it needs to be removed too.
"""
# User Assigned Identites
identities = hub_resource["identity"].get("userAssignedIdentities")
removed_identities = []
if identities:
existing_identities = {}
for identity in identities:
success = cli.invoke(f"identity show --ids {identity}").success()
if success:
existing_identities[identity] = {}
else:
logger.warning(
usr_msgs.SAVE_UAI_RETRIEVE_FAIL_MSG.format(identity)
)
removed_identities.append(identity)
hub_resource["identity"]["userAssignedIdentities"] = existing_identities
# Endpoints - build up new list of endpoints we want to keep and track the removed ones
endpoints = hub_resource["properties"]["routing"]["endpoints"]
removed_endpoints = []
# Cosmos Db
cosmos_endpoints = []
for ep in endpoints.get("cosmosDBSqlContainers", []):
account_name = ep["endpointUri"].strip("https://").split(".")[0]
if ep.get("primaryKey") or ep.get("secondaryKey"):
try:
cosmos_keys = cli.invoke(
"cosmosdb keys list --name {} --resource-group {} --type connection-strings --subscription {}".format(
account_name,
ep.get("resourceGroup"),
ep.get("subscriptionId")
)
).as_json()
except AzCLIError:
logger.warning(
usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Cosmos DB Sql Collection", ep["name"])
)
removed_endpoints.append(ep["name"])
continue
for cs_object in cosmos_keys["connectionStrings"]:
if cs_object["description"] == "Primary SQL Connection String" and ep.get("primaryKey"):
ep["primaryKey"] = parse_cosmos_db_connection_string(cs_object["connectionString"])["AccountKey"]
if cs_object["description"] == "Secondary SQL Connection String" and ep.get("secondaryKey"):
ep["secondaryKey"] = parse_cosmos_db_connection_string(cs_object["connectionString"])["AccountKey"]
elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities:
logger.warning(
usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format(
"Cosmos DB Sql Collection", ep["name"], ep["identity"].get("userAssignedIdentity")
)
)
removed_endpoints.append(ep["name"])
else:
success = cli.invoke(
"cosmosdb sql container show --account-name {} --resource-group {} --database-name {} "
"--name {} --subscription {}".format(
account_name,
ep.get("resourceGroup"),
ep["databaseName"],
ep["collectionName"],
ep.get("subscriptionId")
)
).success()
if not success:
logger.warning(
usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Cosmos DB Sql Collection", ep["name"])
)
removed_endpoints.append(ep["name"])
continue
cosmos_endpoints.append(ep)
endpoints["cosmosDBSqlContainers"] = cosmos_endpoints
# Event Hub
eventhub_endpoints = []
for ep in endpoints["eventHubs"]:
if ep.get("connectionString"):
endpoint_props = parse_iot_hub_message_endpoint_connection_string(ep["connectionString"])
namespace = endpoint_props["Endpoint"].strip("sb://").split(".")[0]
try:
ep["connectionString"] = cli.invoke(
"eventhubs eventhub authorization-rule keys list --namespace-name {} --resource-group {} "
"--eventhub-name {} --name {} --subscription {}".format(
namespace,
ep.get("resourceGroup"),
endpoint_props["EntityPath"],
endpoint_props["SharedAccessKeyName"],
ep.get("subscriptionId")
)
).as_json()["primaryConnectionString"]
except AzCLIError:
logger.warning(usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Event Hub", ep["name"]))
removed_endpoints.append(ep["name"])
continue
elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities:
logger.warning(
usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format(
"Event Hub", ep["name"], ep["identity"].get("userAssignedIdentity")
)
)
removed_endpoints.append(ep["name"])
else:
namespace = ep["endpointUri"].strip("sb://").split(".")[0]
success = cli.invoke(
"eventhubs eventhub show --namespace-name {} --resource-group {} "
"--name {} --subscription {}".format(
namespace,
ep.get("resourceGroup"),
ep["entityPath"],
ep.get("subscriptionId")
)
).success()
if not success:
logger.warning(usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Event Hub", ep["name"]))
removed_endpoints.append(ep["name"])
continue
eventhub_endpoints.append(ep)
endpoints["eventHubs"] = eventhub_endpoints
# Service Bus Queue
servicebus_queue_endpoints = []
for ep in endpoints["serviceBusQueues"]:
if ep.get("connectionString"):
endpoint_props = parse_iot_hub_message_endpoint_connection_string(ep["connectionString"])
namespace = endpoint_props["Endpoint"].strip("sb://").split(".")[0]
try:
ep["connectionString"] = cli.invoke(
"servicebus queue authorization-rule keys list --namespace-name {} --resource-group {} "
"--queue-name {} --name {} --subscription {}".format(
namespace,
ep.get("resourceGroup"),
endpoint_props["EntityPath"],
endpoint_props["SharedAccessKeyName"],
ep.get("subscriptionId")
)
).as_json()["primaryConnectionString"]
except AzCLIError:
logger.warning(usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Service Bus Queue", ep["name"]))
removed_endpoints.append(ep["name"])
continue
elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities:
logger.warning(
usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format(
"Service Bus Queue", ep["name"], ep["identity"].get("userAssignedIdentity")
)
)
removed_endpoints.append(ep["name"])
else:
namespace = ep["endpointUri"].strip("sb://").split(".")[0]
success = cli.invoke(
"servicebus queue show --namespace-name {} --resource-group {} "
"--name {} --subscription {}".format(
namespace,
ep.get("resourceGroup"),
ep["entityPath"],
ep.get("subscriptionId")
)
).success()
if not success:
logger.warning(usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Service Bus Queue", ep["name"]))
removed_endpoints.append(ep["name"])
continue
servicebus_queue_endpoints.append(ep)
endpoints["serviceBusQueues"] = servicebus_queue_endpoints
# Service Bus Topic
servicebus_topic_endpoints = []
for ep in endpoints["serviceBusTopics"]:
if ep.get("connectionString"):
endpoint_props = parse_iot_hub_message_endpoint_connection_string(ep["connectionString"])
namespace = endpoint_props["Endpoint"].strip("sb://").split(".")[0]
try:
ep["connectionString"] = cli.invoke(
"servicebus topic authorization-rule keys list --namespace-name {} --resource-group {} "
"--topic-name {} --name {} --subscription {}".format(
namespace,
ep.get("resourceGroup"),
endpoint_props["EntityPath"],
endpoint_props["SharedAccessKeyName"],
ep.get("subscriptionId")
)
).as_json()["primaryConnectionString"]
except AzCLIError:
logger.warning(
usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Service Bus Topic", ep["name"])
)
removed_endpoints.append(ep["name"])
continue
elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities:
logger.warning(
usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format(
"Service Bus Topic", ep["name"], ep["identity"].get("userAssignedIdentity")
)
)
removed_endpoints.append(ep["name"])
else:
namespace = ep["endpointUri"].strip("sb://").split(".")[0]
success = cli.invoke(
"servicebus topic show --namespace-name {} --resource-group {} "
"--name {} --subscription {}".format(
namespace,
ep.get("resourceGroup"),
ep["entityPath"],
ep.get("subscriptionId")
)
).success()
if not success:
logger.warning(
usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Service Bus Topic", ep["name"])
)
removed_endpoints.append(ep["name"])
continue
servicebus_topic_endpoints.append(ep)
endpoints["serviceBusTopics"] = servicebus_topic_endpoints
# Storage Account
storage_endpoints = []
for ep in endpoints["storageContainers"]:
if ep.get("connectionString"):
endpoint_props = parse_storage_container_connection_string(ep["connectionString"])
try:
ep["connectionString"] = cli.invoke(
"storage account show-connection-string -n {} -g {} --subscription {}".format(
endpoint_props["AccountName"],
ep.get("resourceGroup"),
ep.get("subscriptionId")
)
).as_json()["connectionString"]
except AzCLIError:
logger.warning(
usr_msgs.SAVE_ENDPOINT_RETRIEVE_FAIL_MSG.format("Storage Container", ep["name"])
)
removed_endpoints.append(ep["name"])
continue
elif isinstance(ep.get("identity"), dict) and ep["identity"].get("userAssignedIdentity") in removed_identities:
logger.warning(
usr_msgs.SAVE_ENDPOINT_UAI_RETRIEVE_FAIL_MSG.format(
"Storage Container", ep["name"], ep["identity"].get("userAssignedIdentity")
)
)
removed_endpoints.append(ep["name"])
else:
account_name = ep["endpointUri"].strip("https://").split(".")[0]
success = cli.invoke(
"storage account show --name {} --subscription {}".format(
account_name,
ep.get("subscriptionId"),
)
).success()
if not success:
logger.warning(
usr_msgs.SAVE_ENDPOINT_INFO_RETRIEVE_FAIL_MSG.format("Storage Container", ep["name"])
)
removed_endpoints.append(ep["name"])
continue
storage_endpoints.append(ep)
endpoints["storageContainers"] = storage_endpoints
# Go through routes to remove any that use removed endpoints
routes = hub_resource["properties"]["routing"]["routes"]
filtered_routes = [route for route in routes if route["endpointNames"][0] not in removed_endpoints]
hub_resource["properties"]["routing"]["routes"] = filtered_routes
filtered_routes = []
for route in routes:
if route["endpointNames"][0] in removed_endpoints:
logger.warning(usr_msgs.SAVE_ROUTE_FAIL_MSG.format(route["name"], route["endpointNames"][0]))
else:
filtered_routes.append(route)
hub_resource["properties"]["routing"]["routes"] = filtered_routes
# File upload
file_upload = hub_resource["properties"]["storageEndpoints"].get("$default", {})
if (
isinstance(file_upload.get("identity"), dict)
and file_upload["identity"].get("userAssignedIdentity") in removed_identities
):
logger.warning(
usr_msgs.SAVE_FILE_UPLOAD_UAI_RETRIEVE_FAIL_MSG.format(file_upload["identity"].get("userAssignedIdentity"))
)
file_upload["authenticationType"] = None
file_upload["connectionString"] = None
file_upload["containerName"] = None
file_upload["identity"] = None
elif file_upload.get("connectionString"):
endpoint_props = parse_storage_container_connection_string(file_upload["connectionString"])
try:
file_upload["connectionString"] = cli.invoke(
"storage account show-connection-string -n {}".format(
endpoint_props["AccountName"]
)
).as_json()["connectionString"]
except AzCLIError:
logger.warning(usr_msgs.SAVE_FILE_UPLOAD_RETRIEVE_FAIL_MSG)
file_upload["authenticationType"] = None
file_upload["connectionString"] = None
file_upload["containerName"] = None