python/az/aro/azext_aro/custom.py (595 lines of code) (raw):

# Copyright (c) Microsoft Corporation. # Licensed under the Apache License 2.0. import collections import random import os from base64 import b64decode import textwrap import azext_aro.vendored_sdks.azure.mgmt.redhatopenshift.v2024_08_12_preview.models as openshiftcluster from azure.cli.command_modules.role import GraphError from azure.cli.core.commands import LongRunningOperation from azure.cli.core.commands.client_factory import ( get_mgmt_service_client, get_subscription_id ) from azure.cli.core.profiles import ResourceType from azure.cli.core.util import sdk_no_wait from azure.cli.core.azclierror import ( FileOperationError, ResourceNotFoundError, InvalidArgumentValueError, UnauthorizedError, ValidationError ) from azure.core.exceptions import ResourceNotFoundError as CoreResourceNotFoundError from azext_aro._aad import AADManager from azext_aro._rbac import ( assign_role_to_resource, has_role_assignment_on_resource ) from azext_aro._rbac import ( ROLE_NETWORK_CONTRIBUTOR, ROLE_READER ) from azext_aro._validators import validate_subnets from azext_aro._dynamic_validators import validate_cluster_create, validate_cluster_delete from azext_aro.aaz.latest.identity import Delete as identity_delete from azext_aro.aaz.latest.network.vnet.subnet import Show as subnet_show from knack.log import get_logger from msrestazure.azure_exceptions import CloudError from msrestazure.tools import ( resource_id, parse_resource_id ) from msrest.exceptions import HttpOperationError from tabulate import tabulate logger = get_logger(__name__) FP_CLIENT_ID = 'f1dd0a37-89c6-4e07-bcd1-ffd3d43d8875' def rp_mode_development(): return os.environ.get('RP_MODE', '').lower() == 'development' def aro_create(cmd, # pylint: disable=too-many-locals client, resource_group_name, resource_name, master_subnet, worker_subnet, vnet=None, # pylint: disable=unused-argument vnet_resource_group_name=None, # pylint: disable=unused-argument enable_preconfigured_nsg=None, location=None, pull_secret=None, domain=None, cluster_resource_group=None, fips_validated_modules=None, client_id=None, client_secret=None, pod_cidr=None, service_cidr=None, outbound_type=None, disk_encryption_set=None, master_encryption_at_host=False, master_vm_size=None, worker_encryption_at_host=False, worker_vm_size=None, worker_vm_disk_size_gb=None, worker_count=None, apiserver_visibility=None, ingress_visibility=None, load_balancer_managed_outbound_ip_count=None, enable_managed_identity=False, platform_workload_identities=None, mi_user_assigned=None, tags=None, version=None, no_wait=False): if not rp_mode_development(): resource_client = get_mgmt_service_client( cmd.cli_ctx, ResourceType.MGMT_RESOURCE_RESOURCES) provider = resource_client.providers.get('Microsoft.RedHatOpenShift') if provider.registration_state != 'Registered': raise UnauthorizedError('Microsoft.RedHatOpenShift provider is not registered.', 'Run `az provider register -n Microsoft.RedHatOpenShift --wait`.') validate_subnets(master_subnet, worker_subnet) validate(cmd, client, resource_group_name, resource_name, master_subnet, worker_subnet, vnet=vnet, enable_preconfigured_nsg=enable_preconfigured_nsg, cluster_resource_group=cluster_resource_group, client_id=client_id, client_secret=client_secret, vnet_resource_group_name=vnet_resource_group_name, disk_encryption_set=disk_encryption_set, location=location, version=version, pod_cidr=pod_cidr, service_cidr=service_cidr, warnings_as_text=True) subscription_id = get_subscription_id(cmd.cli_ctx) random_id = generate_random_id() aad = AADManager(cmd.cli_ctx) if not enable_managed_identity: if client_id is None: client_id, client_secret = aad.create_application(cluster_resource_group or 'aro-' + random_id) client_sp_id = aad.get_service_principal_id(client_id) if not client_sp_id: client_sp_id = aad.create_service_principal(client_id) rp_client_sp_id = aad.get_service_principal_id(resolve_rp_client_id()) if not rp_client_sp_id: raise ResourceNotFoundError("RP service principal not found.") if rp_mode_development(): worker_vm_size = worker_vm_size or 'Standard_D2s_v5' else: worker_vm_size = worker_vm_size or 'Standard_D4s_v5' if apiserver_visibility is not None: apiserver_visibility = apiserver_visibility.capitalize() if ingress_visibility is not None: ingress_visibility = ingress_visibility.capitalize() load_balancer_profile = None if load_balancer_managed_outbound_ip_count is not None: load_balancer_profile = openshiftcluster.LoadBalancerProfile() load_balancer_profile.managed_outbound_ips = openshiftcluster.ManagedOutboundIPs() load_balancer_profile.managed_outbound_ips.count = load_balancer_managed_outbound_ip_count # pylint: disable=line-too-long oc = openshiftcluster.OpenShiftCluster( location=location, tags=tags, cluster_profile=openshiftcluster.ClusterProfile( pull_secret=pull_secret or "", domain=domain or random_id, resource_group_id=(f"/subscriptions/{subscription_id}" f"/resourceGroups/{cluster_resource_group or 'aro-' + random_id}"), fips_validated_modules='Enabled' if fips_validated_modules else 'Disabled', version=version or '', ), network_profile=openshiftcluster.NetworkProfile( pod_cidr=pod_cidr or '10.128.0.0/14', service_cidr=service_cidr or '172.30.0.0/16', outbound_type=outbound_type or '', load_balancer_profile=load_balancer_profile, preconfigured_nsg='Enabled' if enable_preconfigured_nsg else 'Disabled', ), master_profile=openshiftcluster.MasterProfile( vm_size=master_vm_size or 'Standard_D8s_v5', subnet_id=master_subnet, encryption_at_host='Enabled' if master_encryption_at_host else 'Disabled', disk_encryption_set_id=disk_encryption_set, ), worker_profiles=[ openshiftcluster.WorkerProfile( name='worker', # TODO: 'worker' should not be hard-coded vm_size=worker_vm_size, disk_size_gb=worker_vm_disk_size_gb or 128, subnet_id=worker_subnet, count=worker_count or 3, encryption_at_host='Enabled' if worker_encryption_at_host else 'Disabled', disk_encryption_set_id=disk_encryption_set, ) ], apiserver_profile=openshiftcluster.APIServerProfile( visibility=apiserver_visibility or 'Public', ), ingress_profiles=[ openshiftcluster.IngressProfile( name='default', # TODO: 'default' should not be hard-coded visibility=ingress_visibility or 'Public', ) ], service_principal_profile=None, platform_workload_identity_profile=None, ) if enable_managed_identity is True: oc.platform_workload_identity_profile = openshiftcluster.PlatformWorkloadIdentityProfile( platform_workload_identities=dict(platform_workload_identities) ) oc.identity = openshiftcluster.ManagedServiceIdentity( type='UserAssigned', user_assigned_identities={mi_user_assigned: {}} ) # TODO - perform client-side validation of required identity permissions else: oc.service_principal_profile = openshiftcluster.ServicePrincipalProfile( client_id=client_id, client_secret=client_secret, ) sp_obj_ids = [client_sp_id, rp_client_sp_id] ensure_resource_permissions(cmd.cli_ctx, oc, True, sp_obj_ids) return sdk_no_wait(no_wait, client.open_shift_clusters.begin_create_or_update, resource_group_name=resource_group_name, resource_name=resource_name, parameters=oc) def validate(cmd, # pylint: disable=too-many-locals,too-many-statements client, # pylint: disable=unused-argument resource_group_name, # pylint: disable=unused-argument resource_name, # pylint: disable=unused-argument master_subnet, worker_subnet, vnet=None, enable_preconfigured_nsg=None, cluster_resource_group=None, # pylint: disable=unused-argument client_id=None, client_secret=None, # pylint: disable=unused-argument vnet_resource_group_name=None, # pylint: disable=unused-argument disk_encryption_set=None, location=None, # pylint: disable=unused-argument version=None, pod_cidr=None, # pylint: disable=unused-argument service_cidr=None, # pylint: disable=unused-argument enable_managed_identity=False, # pylint: disable=unused-argument platform_workload_identities=None, # pylint: disable=unused-argument mi_user_assigned=None, # pylint: disable=unused-argument warnings_as_text=False): class mockoc: # pylint: disable=too-few-public-methods def __init__(self, disk_encryption_id, master_subnet_id, worker_subnet_id, preconfigured_nsg): self.network_profile = openshiftcluster.NetworkProfile( preconfigured_nsg='Enabled' if preconfigured_nsg else 'Disabled' ) self.master_profile = openshiftcluster.MasterProfile( subnet_id=master_subnet_id, disk_encryption_set_id=disk_encryption_id ) self.worker_profiles = [openshiftcluster.WorkerProfile( subnet_id=worker_subnet_id )] self.worker_profiles_status = None aad = AADManager(cmd.cli_ctx) rp_client_sp_id = aad.get_service_principal_id(resolve_rp_client_id()) if not rp_client_sp_id: raise ResourceNotFoundError("RP service principal not found.") sp_obj_ids = [rp_client_sp_id] if client_id is not None: sp_obj_ids.append(aad.get_service_principal_id(client_id)) cluster = mockoc(disk_encryption_set, master_subnet, worker_subnet, enable_preconfigured_nsg) try: # Get cluster resources we need to assign permissions on, sort to ensure the same order of operations resources = {ROLE_NETWORK_CONTRIBUTOR: sorted(get_cluster_network_resources(cmd.cli_ctx, cluster, True)), ROLE_READER: sorted(get_disk_encryption_resources(cluster))} except (CloudError, HttpOperationError) as e: logger.error(e.message) raise if vnet is None: master_parts = parse_resource_id(master_subnet) vnet = resource_id( subscription=master_parts['subscription'], resource_group=master_parts['resource_group'], namespace='Microsoft.Network', type='virtualNetworks', name=master_parts['name'], ) error_objects = validate_cluster_create(version, resources, sp_obj_ids) errors_and_warnings = [] for error_func in error_objects: namespace = collections.namedtuple("Namespace", locals().keys())(*locals().values()) error_obj = error_func(cmd, namespace) if error_obj != []: for err in error_obj: # Wrap text so tabulate returns a pretty table new_err = [] for txt in err: new_err.append(textwrap.fill(txt, width=160)) errors_and_warnings.append(new_err) warnings = [] errors = [] if len(errors_and_warnings) > 0: # Separate errors and warnings into separate arrays for issue in errors_and_warnings: if issue[2] == "Warning": warnings.append(issue) else: errors.append(issue) else: logger.info("No validation errors or warnings") if len(warnings) > 0: if len(errors) == 0 and warnings_as_text: full_msg = "" for warning in warnings: full_msg = full_msg + f"{warning[3]}\n" else: headers = ["Type", "Name", "Severity", "Description"] table = tabulate(warnings, headers=headers, tablefmt="grid") full_msg = f"The following issues will have a minor impact on cluster creation:\n{table}" logger.warning(full_msg) if len(errors) > 0: if len(warnings) > 0: full_msg = "\n" else: full_msg = "" headers = ["Type", "Name", "Severity", "Description"] table = tabulate(errors, headers=headers, tablefmt="grid") full_msg = full_msg + f"The following errors are fatal and will block cluster creation:\n{table}" raise ValidationError(full_msg) def aro_validate(cmd, # pylint: disable=too-many-locals,too-many-statements client, resource_group_name, resource_name, master_subnet, worker_subnet, vnet=None, cluster_resource_group=None, client_id=None, client_secret=None, vnet_resource_group_name=None, disk_encryption_set=None, location=None, version=None, pod_cidr=None, service_cidr=None, enable_managed_identity=False, platform_workload_identities=None, mi_user_assigned=None, ): validate(cmd, client, resource_group_name, resource_name, master_subnet, worker_subnet, vnet=vnet, cluster_resource_group=cluster_resource_group, client_id=client_id, client_secret=client_secret, vnet_resource_group_name=vnet_resource_group_name, disk_encryption_set=disk_encryption_set, location=location, version=version, pod_cidr=pod_cidr, service_cidr=service_cidr, enable_managed_identity=enable_managed_identity, platform_workload_identities=platform_workload_identities, mi_user_assigned=mi_user_assigned, warnings_as_text=False) def aro_delete(cmd, client, resource_group_name, resource_name, no_wait=False, delete_identities=None): # TODO: clean up rbac rp_client_sp_id = None try: oc = client.open_shift_clusters.get(resource_group_name, resource_name) except CloudError as e: if e.status_code == 404: raise ResourceNotFoundError(e.message) from e logger.info(e.message) except HttpOperationError as e: logger.info(e.message) if delete_identities and oc.service_principal_profile is not None: raise InvalidArgumentValueError( "Cannot delete managed identities for a non-managed identity cluster" ) # Since we delete the managed identities only after deleting the cluster, # it is critical that we log the list of managed identities while we're # still able to get it from the cluster doc. This way, if the CLI fails in # the middle of cluster deletion, etc., the customer will still have access # to the list in case they want to know which identities to delete. managed_identities = [] if oc.identity is not None and oc.identity.user_assigned_identities is not None: managed_identities += list(oc.identity.user_assigned_identities) if oc.platform_workload_identity_profile is not None: managed_identities += [pwi.resource_id for _, pwi in oc.platform_workload_identity_profile.platform_workload_identities.items()] # pylint: disable=line-too-long errors = validate_cluster_delete(cmd, delete_identities, managed_identities) if errors: error_messages = "- " + "\n- ".join(errors) raise UnauthorizedError(f"Pre-delete validation failed with the following issues:\n{error_messages}") if delete_identities: bulleted_mi_list = "\n".join([f"- {mi}" for mi in managed_identities]) logger.warning("After deleting the ARO cluster, will delete the following set of managed identities that was associated with it:\n%s", bulleted_mi_list) # pylint: disable=line-too-long elif oc.platform_workload_identity_profile is not None: bulleted_delete_command_list = "\n".join([f"- az identity delete -g {parse_resource_id(mi)['resource_group']} -n {parse_resource_id(mi)['name']}" for mi in managed_identities]) # pylint: disable=line-too-long logger.warning("The cluster's managed identities will still need to be deleted once cluster deletion completes. You can use the following commands to delete them:\n%s", bulleted_delete_command_list) # pylint: disable=line-too-long aad = AADManager(cmd.cli_ctx) # Best effort - assume the role assignments on the SP exist if exception raised try: rp_client_sp_id = aad.get_service_principal_id(resolve_rp_client_id()) if not rp_client_sp_id: raise ResourceNotFoundError("RP service principal not found.") except GraphError as e: logger.info(e) # Customers frequently remove the Cluster or RP's service principal permissions. # Attempt to fix this before performing any action against the cluster if rp_client_sp_id: ensure_resource_permissions(cmd.cli_ctx, oc, False, [rp_client_sp_id]) if delete_identities: # Note that because we need to confirm the cluster's successful deletion before # deleting the managed identities, we must wait for the asynchronous operation # to complete here and handle the result rather than using sdk_no_wait. result = LongRunningOperation(cmd.cli_ctx)(client.open_shift_clusters.begin_delete(resource_group_name=resource_group_name, # pylint: disable=line-too-long resource_name=resource_name, polling=True)) logger.warning("Successfully deleted ARO cluster; deleting managed identities...") for mi in managed_identities: mi_resource_id = parse_resource_id(mi) # You might think we'd want to log a different message in the case where # the identity is not found, but the delete command is idempotent and # will not raise 404 exceptions. We want all other exceptions to be raised # directly to the user though, hence the lack of a try/except. identity_delete(cli_ctx=cmd.cli_ctx)(command_args={ 'resource_name': mi_resource_id['name'], 'resource_group': mi_resource_id['resource_group'], }) logger.warning("Successfully deleted managed identity %s", mi) return result return sdk_no_wait(no_wait, client.open_shift_clusters.begin_delete, resource_group_name=resource_group_name, resource_name=resource_name) def aro_list(client, resource_group_name=None): if resource_group_name: return client.open_shift_clusters.list_by_resource_group(resource_group_name) return client.open_shift_clusters.list() def aro_show(client, resource_group_name, resource_name): return client.open_shift_clusters.get(resource_group_name, resource_name) def aro_list_credentials(client, resource_group_name, resource_name): return client.open_shift_clusters.list_credentials(resource_group_name, resource_name) def aro_get_admin_kubeconfig(client, resource_group_name, resource_name, file="kubeconfig"): query_result = client.open_shift_clusters.list_admin_credentials(resource_group_name, resource_name) file_mode = "x" yaml_data = b64decode(query_result.kubeconfig).decode('UTF-8') try: with open(file, file_mode, encoding="utf-8") as f: f.write(yaml_data) except FileExistsError as e: raise FileOperationError(f"File {file} already exists.") from e logger.info("Kubeconfig written to file: %s", file) def aro_get_versions(client, location): items = client.open_shift_versions.list(location) versions = [] for item in items: versions.append(item.version) return sorted(versions) def aro_update(cmd, client, resource_group_name, resource_name, refresh_cluster_credentials=False, client_id=None, client_secret=None, platform_workload_identities=None, load_balancer_managed_outbound_ip_count=None, upgradeable_to=None, no_wait=False): # if we can't read cluster spec, we will not be able to do much. Fail. oc = client.open_shift_clusters.get(resource_group_name, resource_name) oc_update = openshiftcluster.OpenShiftClusterUpdate() if platform_workload_identities is not None and oc.service_principal_profile is not None: raise InvalidArgumentValueError( "Cannot assign platform workload identities to a cluster with service principal" ) if oc.service_principal_profile is not None: client_id, client_secret = cluster_application_update(cmd.cli_ctx, oc, client_id, client_secret, refresh_cluster_credentials) # pylint: disable=line-too-long if client_id is not None or client_secret is not None: # construct update payload oc_update.service_principal_profile = openshiftcluster.ServicePrincipalProfile() if client_secret is not None: oc_update.service_principal_profile.client_secret = client_secret if client_id is not None: oc_update.service_principal_profile.client_id = client_id if oc.platform_workload_identity_profile is not None: if platform_workload_identities is not None or upgradeable_to is not None: oc_update.platform_workload_identity_profile = openshiftcluster.PlatformWorkloadIdentityProfile() if platform_workload_identities is not None: oc_update.platform_workload_identity_profile.platform_workload_identities = dict(platform_workload_identities) # pylint: disable=line-too-long if upgradeable_to is not None: oc_update.platform_workload_identity_profile.upgradeable_to = upgradeable_to if load_balancer_managed_outbound_ip_count is not None: oc_update.network_profile = openshiftcluster.NetworkProfile() oc_update.network_profile.load_balancer_profile = openshiftcluster.LoadBalancerProfile() oc_update.network_profile.load_balancer_profile.managed_outbound_ips = openshiftcluster.ManagedOutboundIPs() oc_update.network_profile.load_balancer_profile.managed_outbound_ips.count = load_balancer_managed_outbound_ip_count # pylint: disable=line-too-long return sdk_no_wait(no_wait, client.open_shift_clusters.begin_update, resource_group_name=resource_group_name, resource_name=resource_name, parameters=oc_update) def generate_random_id(): random_id = (random.choice('abcdefghijklmnopqrstuvwxyz') + ''.join(random.choice('abcdefghijklmnopqrstuvwxyz1234567890') for _ in range(7))) return random_id def get_network_resources_from_subnets(cli_ctx, subnets, fail, oc): subnet_resources = set() subnets_with_no_nsg_attached = set() for sn in subnets: sid = parse_resource_id(sn) if 'resource_group' not in sid or 'name' not in sid or 'resource_name' not in sid: if fail: raise ValidationError(f"""(ValidationError) Failed to validate subnet '{sn}'. Please retry, if issue persists: raise azure support ticket""") logger.info("Failed to validate subnet '%s'", sn) try: subnet = subnet_show(cli_ctx=cli_ctx)(command_args={ "name": sid['resource_name'], "vnet_name": sid['name'], "resource_group": sid['resource_group']} ) except CoreResourceNotFoundError: continue if subnet.get("routeTable", None): subnet_resources.add(subnet['routeTable']['id']) if subnet.get("natGateway", None): subnet_resources.add(subnet['natGateway']['id']) if oc.network_profile.preconfigured_nsg == 'Enabled': if subnet.get("networkSecurityGroup", None): subnet_resources.add(subnet['networkSecurityGroup']['id']) else: subnets_with_no_nsg_attached.add(sn) # when preconfiguredNSG feature is Enabled we either have all subnets NSG attached or none. if oc.network_profile.preconfigured_nsg == 'Enabled' and \ len(subnets_with_no_nsg_attached) != 0 and \ len(subnets_with_no_nsg_attached) != len(subnets): raise ValidationError(f"(ValidationError) preconfiguredNSG feature is enabled but an NSG is\ not attached for all required subnets. Please make sure all the following\ subnets have a network security groups attached and retry.\ {subnets_with_no_nsg_attached}") return subnet_resources def get_cluster_network_resources(cli_ctx, oc, fail): master_subnet = oc.master_profile.subnet_id worker_subnets = set() # Ensure that worker_profiles exists # it will not be returned if the cluster resources do not exist if oc.worker_profiles is not None: worker_subnets = {w.subnet_id for w in oc.worker_profiles} # Ensure that worker_profiles_status exists # it will not be returned if the cluster resources do not exist if oc.worker_profiles_status is not None: worker_subnets |= {w.subnet_id for w in oc.worker_profiles_status} master_parts = parse_resource_id(master_subnet) vnet = resource_id( subscription=master_parts['subscription'], resource_group=master_parts['resource_group'], namespace='Microsoft.Network', type='virtualNetworks', name=master_parts['name'], ) return get_network_resources(cli_ctx, worker_subnets | {master_subnet}, vnet, fail, oc) def get_network_resources(cli_ctx, subnets, vnet, fail, oc): subnet_resources = get_network_resources_from_subnets(cli_ctx, subnets, fail, oc) resources = set() resources.add(vnet) resources.update(subnet_resources) return resources def get_disk_encryption_resources(oc): disk_encryption_set = oc.master_profile.disk_encryption_set_id resources = set() if disk_encryption_set: resources.add(disk_encryption_set) return resources # cluster_application_update manages cluster application & service principal update # If called without parameters it should be best-effort # If called with parameters it fails if something is not possible # Flow: # 1. Set fail - if we are in fail mode or best effort. # 2. Sort out client_id, rp_client_sp, resources we care for RBAC. # 3. If we are in refresh_cluster_credentials mode - attempt to reuse/recreate # cluster service principal application and acquire client_id, client_secret # 4. Reuse/Recreate service principal. # 5. Sort out required rbac def cluster_application_update(cli_ctx, oc, client_id, client_secret, refresh_cluster_credentials): # QUESTION: is there possible unification with the create path? rp_client_sp_id = None client_sp_id = None random_id = generate_random_id() # if any of these are set - we expect users to have access to fix rbac so we fail # common for 1 and 2 flows fail = client_id or client_secret or refresh_cluster_credentials aad = AADManager(cli_ctx) # check if we can see if RP service principal exists try: rp_client_sp_id = aad.get_service_principal_id(resolve_rp_client_id()) if not rp_client_sp_id: raise ResourceNotFoundError("RP service principal not found.") except GraphError as e: if fail: logger.error(e) raise logger.info(e) # refresh_cluster_credentials refreshes cluster SP application. # At firsts it tries to re-use existing application and generate new password. # If application does not exist - creates new one if refresh_cluster_credentials: try: app = aad.get_application_object_id_by_client_id(client_id or oc.service_principal_profile.client_id) if not app: # we were not able to find and applications, create new one parts = parse_resource_id(oc.cluster_profile.resource_group_id) cluster_resource_group = parts['resource_group'] client_id, client_secret = aad.create_application(cluster_resource_group or 'aro-' + random_id) else: client_secret = aad.add_password(app) except GraphError as e: logger.error(e) raise # attempt to get/create SP if one was not found. try: client_sp_id = aad.get_service_principal_id(client_id or oc.service_principal_profile.client_id) except GraphError as e: if fail: logger.error(e) raise logger.info(e) if fail and not client_sp_id: client_sp_id = aad.create_service_principal(client_id or oc.service_principal_profile.client_id) sp_obj_ids = [sp for sp in [rp_client_sp_id, client_sp_id] if sp] ensure_resource_permissions(cli_ctx, oc, fail, sp_obj_ids) return client_id, client_secret def resolve_rp_client_id(): if rp_mode_development(): return os.environ.get('AZURE_FP_CLIENT_ID', FP_CLIENT_ID) return FP_CLIENT_ID def ensure_resource_permissions(cli_ctx, oc, fail, sp_obj_ids): try: # Get cluster resources we need to assign permissions on, sort to ensure the same order of operations resources = {ROLE_NETWORK_CONTRIBUTOR: sorted(get_cluster_network_resources(cli_ctx, oc, fail)), ROLE_READER: sorted(get_disk_encryption_resources(oc))} except (CloudError, HttpOperationError) as e: if fail: logger.error(e.message) raise logger.info(e.message) return for sp_id in sp_obj_ids: for role in sorted(resources): for resource in resources[role]: # Create the role assignment if it doesn't exist # Assume that the role assignment exists if we fail to look it up resource_contributor_exists = True try: resource_contributor_exists = has_role_assignment_on_resource(cli_ctx, resource, sp_id, role) except CloudError as e: if fail: logger.error(e.message) raise logger.info(e.message) if not resource_contributor_exists: assign_role_to_resource(cli_ctx, resource, sp_id, role)