in azext_edge/edge/providers/orchestration/resource_map.py [0:0]
def refresh_resource_state(self):
refreshed_cluster_container = ClusterContainer()
custom_locations = self.connected_cluster.get_aio_custom_locations()
if custom_locations:
for cl in custom_locations:
cl_container = CustomLocationsContainer(
resource=IoTOperationsResource(
resource_id=cl["id"], display_name=cl["name"], api_version=cl["apiVersion"]
)
)
cl_sync_rules = self.connected_cluster.get_resource_sync_rules(cl["id"])
if cl_sync_rules:
for sync_rule in cl_sync_rules:
cl_container.resource_sync_rules.append(
IoTOperationsResource(
resource_id=sync_rule["id"],
display_name=sync_rule["name"],
api_version=sync_rule["apiVersion"],
)
)
cl_resources = self.connected_cluster.get_aio_resources(cl["id"])
if cl_resources:
for resource in cl_resources:
cl_container.related_resources.append(
IoTOperationsResource(
resource_id=resource["id"],
display_name=resource["name"],
api_version=resource["apiVersion"],
)
)
refreshed_cluster_container.custom_locations[cl["id"]] = cl_container
extensions = self.connected_cluster.get_aio_extensions()
if extensions:
for ext in extensions:
refreshed_cluster_container.extensions.append(
IoTOperationsResource(
resource_id=ext["id"],
display_name=ext["name"],
api_version=ext["apiVersion"],
)
)
self._cluster_container = refreshed_cluster_container