python/az/aro/azext_aro/_validators.py (299 lines of code) (raw):

# Copyright (c) Microsoft Corporation. # Licensed under the Apache License 2.0. import ipaddress import json import re import uuid from os.path import exists from collections import Counter from azure.cli.core.commands.client_factory import get_mgmt_service_client, get_subscription_id from azure.cli.core.profiles import ResourceType from azure.cli.core.azclierror import ( CLIInternalError, InvalidArgumentValueError, RequiredArgumentMissingError, MutuallyExclusiveArgumentError ) from azure.core.exceptions import ResourceNotFoundError from knack.log import get_logger from msrestazure.azure_exceptions import CloudError from msrestazure.tools import is_valid_resource_id, parse_resource_id, resource_id from azext_aro.aaz.latest.network.vnet.subnet import Show as subnet_show logger = get_logger(__name__) def validate_cidr(key): def _validate_cidr(namespace): cidr = getattr(namespace, key) if cidr is None: return try: ipaddress.IPv4Network(cidr) except ValueError as e: raise InvalidArgumentValueError(f"Invalid --{key.replace('_', '-')} '{cidr}'.") from e return _validate_cidr def validate_client_id(isCreate): def _validate_client_id(namespace): if namespace.client_id is None: return if hasattr(namespace, 'enable_managed_identity') and namespace.enable_managed_identity is True: raise MutuallyExclusiveArgumentError('Must not specify --client-id when --enable-managed-identity is True') # pylint: disable=line-too-long if namespace.platform_workload_identities is not None: raise MutuallyExclusiveArgumentError('Must not specify --client-id when --assign-platform-workload-identity is used') # pylint: disable=line-too-long try: uuid.UUID(namespace.client_id) except ValueError as e: raise InvalidArgumentValueError(f"Invalid --client-id '{namespace.client_id}'.") from e # pylint: disable=line-too-long if namespace.client_secret is None or not str(namespace.client_secret): raise RequiredArgumentMissingError('Must specify --client-secret with --client-id.') # pylint: disable=line-too-long if not isCreate and namespace.upgradeable_to is not None: raise MutuallyExclusiveArgumentError('Must not specify --client-id when --upgradeable-to is used.') # pylint: disable=line-too-long return _validate_client_id def validate_client_secret(isCreate): def _validate_client_secret(namespace): if namespace.client_secret is None: return if hasattr(namespace, 'enable_managed_identity') and namespace.enable_managed_identity is True: raise MutuallyExclusiveArgumentError('Must not specify --client-secret when --enable-managed-identity is True') # pylint: disable=line-too-long if namespace.platform_workload_identities is not None: raise MutuallyExclusiveArgumentError('Must not specify --client-secret when --assign-platform-workload-identity is used') # pylint: disable=line-too-long if isCreate and (namespace.client_id is None or not str(namespace.client_id)): raise RequiredArgumentMissingError('Must specify --client-id with --client-secret.') if not isCreate and namespace.upgradeable_to is not None: raise MutuallyExclusiveArgumentError('Must not specify --client-secret when --upgradeable-to is used.') # pylint: disable=line-too-long return _validate_client_secret def validate_cluster_resource_group(cmd, namespace): if namespace.cluster_resource_group is None: return client = get_mgmt_service_client( cmd.cli_ctx, ResourceType.MGMT_RESOURCE_RESOURCES) if client.resource_groups.check_existence(namespace.cluster_resource_group): raise InvalidArgumentValueError( f"Invalid --cluster-resource-group '{namespace.cluster_resource_group}':" " resource group must not exist.") def validate_disk_encryption_set(cmd, namespace): if namespace.disk_encryption_set is None: return if not is_valid_resource_id(namespace.disk_encryption_set): raise InvalidArgumentValueError( f"Invalid --disk-encryption-set '{namespace.disk_encryption_set}', has to be a resource ID.") desid = parse_resource_id(namespace.disk_encryption_set) compute_client = get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_COMPUTE) try: compute_client.disk_encryption_sets.get(resource_group_name=desid['resource_group'], disk_encryption_set_name=desid['name']) except CloudError as err: raise InvalidArgumentValueError( f"Invalid --disk-encryption-set, error when getting '{namespace.disk_encryption_set}':" f" {str(err)}") from err def validate_domain(namespace): if namespace.domain is None: return if not re.match(r'^' + r'([a-z0-9]|[a-z0-9][-a-z0-9]{0,61}[a-z0-9])' + r'(\.([a-z0-9]|[a-z0-9][-a-z0-9]{0,61}[a-z0-9]))*' + r'$', namespace.domain): raise InvalidArgumentValueError(f"Invalid --domain '{namespace.domain}'.") def validate_pull_secret(namespace): if namespace.pull_secret is None: # TODO: add aka.ms link here warning = "No --pull-secret provided: cluster will not include samples or operators from " + \ "Red Hat or from certified partners." logger.warning(warning) return try: if exists(namespace.pull_secret): with open(namespace.pull_secret, 'r', encoding='utf-8') as file: namespace.pull_secret = file.read().rstrip('\n') if not isinstance(json.loads(namespace.pull_secret), dict): raise Exception() except Exception as e: raise InvalidArgumentValueError("Invalid --pull-secret.") from e def validate_outbound_type(namespace): outbound_type = getattr(namespace, 'outbound_type') if outbound_type not in {'UserDefinedRouting', 'Loadbalancer', None}: raise InvalidArgumentValueError('Invalid --outbound-type: must be "UserDefinedRouting" or "Loadbalancer"') ingress_visibility = getattr(namespace, 'ingress_visibility') apiserver_visibility = getattr(namespace, 'apiserver_visibility') if (outbound_type == 'UserDefinedRouting' and (is_visibility_public(ingress_visibility) or is_visibility_public(apiserver_visibility))): raise InvalidArgumentValueError('Invalid --outbound-type: cannot use UserDefinedRouting when ' + 'either --apiserver-visibility or --ingress-visibility is set ' + 'to Public or not defined') def is_visibility_public(visibility): return visibility == 'Public' or visibility is None def validate_subnet(key): def _validate_subnet(cmd, namespace): subnet = getattr(namespace, key) if not is_valid_resource_id(subnet): if not namespace.vnet: raise RequiredArgumentMissingError(f"Must specify --vnet if --{key.replace('_', '-')} is not an id.") validate_vnet(cmd, namespace) subnet = namespace.vnet + '/subnets/' + subnet setattr(namespace, key, subnet) parts = parse_resource_id(subnet) if parts['subscription'] != get_subscription_id(cmd.cli_ctx): raise InvalidArgumentValueError( f"--{key.replace('_', '-')} subscription '{parts['subscription']}' must equal cluster subscription.") expected_namespace = 'microsoft.network' if parts['namespace'].lower() != expected_namespace: raise InvalidArgumentValueError( f"--{key.replace('_', '-')} namespace '{parts['namespace']}' must equal Microsoft.Network.") expected_type = 'virtualnetworks' if parts['type'].lower() != expected_type: raise InvalidArgumentValueError( f"--{key.replace('_', '-')} type '{parts['type']}' must equal virtualNetworks.") expected_last_child_num = 1 if parts['last_child_num'] != expected_last_child_num: raise InvalidArgumentValueError(f"--{key.replace('_', '-')} '{subnet}' must have one child.") if 'child_namespace_1' in parts: raise InvalidArgumentValueError(f"--{key.replace('_', '-')} '{subnet}' must not have child namespace.") if parts['child_type_1'].lower() != 'subnets': raise InvalidArgumentValueError(f"--{key.replace('_', '-')} child type '{subnet}' must equal subnets.") try: subnet_show(cli_ctx=cmd.cli_ctx)(command_args={ "name": parts['child_name_1'], "vnet_name": parts['name'], "resource_group": parts['resource_group'] }) except Exception as err: if isinstance(err, ResourceNotFoundError): raise InvalidArgumentValueError( f"Invalid --{key.replace('_', '-')}, error when getting '{subnet}': {str(err)}") from err raise CLIInternalError(f"Unexpected error when getting subnet '{subnet}': {str(err)}") from err return _validate_subnet def validate_subnets(master_subnet, worker_subnet): master_parts = parse_resource_id(master_subnet) worker_parts = parse_resource_id(worker_subnet) if master_parts['resource_group'].lower() != worker_parts['resource_group'].lower(): raise InvalidArgumentValueError( f"--master-subnet resource group '{master_parts['resource_group']}' must equal " f"--worker-subnet resource group '{worker_parts['resource_group']}'.") if master_parts['name'].lower() != worker_parts['name'].lower(): raise InvalidArgumentValueError( f"--master-subnet vnet name '{master_parts['name']}'" f" must equal --worker-subnet vnet name '{worker_parts['name']}'.") if master_parts['child_name_1'].lower() == worker_parts['child_name_1'].lower(): raise InvalidArgumentValueError( f"--master-subnet name '{master_parts['child_name_1']}'" f" must not equal --worker-subnet name '{worker_parts['child_name_1']}'.") def validate_visibility(key): def _validate_visibility(namespace): visibility = getattr(namespace, key) if visibility is None: return visibility = visibility.capitalize() possible_visibilities = ['Private', 'Public'] if visibility not in possible_visibilities: raise InvalidArgumentValueError(f"Invalid --{key.replace('_', '-')} '{visibility}'.") return _validate_visibility def validate_vnet(cmd, namespace): validate_vnet_resource_group_name(namespace) if not namespace.vnet: return if not is_valid_resource_id(namespace.vnet): namespace.vnet = resource_id( subscription=get_subscription_id(cmd.cli_ctx), resource_group=namespace.vnet_resource_group_name, namespace='Microsoft.Network', type='virtualNetworks', name=namespace.vnet, ) def validate_vnet_resource_group_name(namespace): if not namespace.vnet_resource_group_name: namespace.vnet_resource_group_name = namespace.resource_group_name def validate_worker_count(namespace): if not namespace.worker_count: return minimum_workers_count = 3 if namespace.worker_count < minimum_workers_count: raise InvalidArgumentValueError('--worker-count must be greater than or equal to ' + str(minimum_workers_count)) def validate_worker_vm_disk_size_gb(namespace): if not namespace.worker_vm_disk_size_gb: return minimum_worker_vm_disk_size_gb = 128 if namespace.worker_vm_disk_size_gb < minimum_worker_vm_disk_size_gb: error_msg = '--worker-vm-disk-size-gb must be greater than or equal to ' + str(minimum_worker_vm_disk_size_gb) raise InvalidArgumentValueError(error_msg) def validate_refresh_cluster_credentials(namespace): if not namespace.refresh_cluster_credentials: return if namespace.client_secret is not None or namespace.client_id is not None: raise RequiredArgumentMissingError('--client-id and --client-secret must be not set with --refresh-credentials.') # pylint: disable=line-too-long if namespace.platform_workload_identities is not None: raise MutuallyExclusiveArgumentError('--platform-workload-identities must be not set with --refresh-credentials.') # pylint: disable=line-too-long if namespace.upgradeable_to is not None: raise MutuallyExclusiveArgumentError('Must not specify --refresh-credentials when --upgradeable-to is used.') # pylint: disable=line-too-long def validate_version_format(namespace): if namespace.version is not None and not re.match(r'^[4-9]{1}\.[0-9]{1,2}\.[0-9]{1,2}$', namespace.version): raise InvalidArgumentValueError('--version is invalid') def validate_upgradeable_to_format(namespace): if not namespace.upgradeable_to: return if not re.match(r'^[4-9]{1}\.(1[4-9]|[1-9][0-9])\.[0-9]{1,2}$', namespace.upgradeable_to): raise InvalidArgumentValueError('--upgradeable-to format is invalid') def validate_load_balancer_managed_outbound_ip_count(namespace): if namespace.load_balancer_managed_outbound_ip_count is None: return minimum_managed_outbound_ips = 1 maximum_managed_outbound_ips = 20 if namespace.load_balancer_managed_outbound_ip_count < minimum_managed_outbound_ips or namespace.load_balancer_managed_outbound_ip_count > maximum_managed_outbound_ips: # pylint: disable=line-too-long error_msg = f"--load-balancer-managed-outbound-ip-count must be between {minimum_managed_outbound_ips} and {maximum_managed_outbound_ips} (inclusive)." # pylint: disable=line-too-long raise InvalidArgumentValueError(error_msg) def validate_enable_managed_identity(namespace): if not namespace.enable_managed_identity: return if namespace.client_id is not None: raise InvalidArgumentValueError('Must not specify --client-id when --enable-managed-identity is True') if namespace.client_secret is not None: raise InvalidArgumentValueError('Must not specify --client-secret when --enable-managed-identity is True') if namespace.version is None or not re.match(r'^[4-9]{1}\.[0-9]{1,2}\.[0-9]{1,2}$', namespace.version): raise InvalidArgumentValueError('Enabling managed identity requires --version >= 4.14.z') _, versionY, _ = namespace.version.split('.', 2) if int(versionY) < 14: raise InvalidArgumentValueError('Enabling managed identity requires --version >= 4.14.z') if not namespace.platform_workload_identities: raise RequiredArgumentMissingError('Enabling managed identity requires platform workload identities to be provided') # pylint: disable=line-too-long if not namespace.mi_user_assigned: raise RequiredArgumentMissingError('Enabling managed identity requires cluster identity to be provided') def validate_platform_workload_identities(isCreate): def _validate_platform_workload_identities(cmd, namespace): if namespace.platform_workload_identities is None: return if isCreate and not namespace.enable_managed_identity: raise RequiredArgumentMissingError('Must set --enable-managed-identity when providing platform workload identities') # pylint: disable=line-too-long names = [name for (name, _) in namespace.platform_workload_identities] name_counter = Counter() name_counter.update(names) duplicates = [name for name, count in name_counter.items() if count > 1] if duplicates: raise InvalidArgumentValueError(f"Platform workload identities {duplicates} were provided multiple times") for (name, identity) in namespace.platform_workload_identities: if not is_valid_resource_id(identity.resource_id): identity.resource_id = identity_name_to_resource_id( cmd, namespace, identity.resource_id) if not is_valid_identity_resource_id(identity.resource_id): raise InvalidArgumentValueError(f"Resource {identity.resource_id} used for platform workload identity {name} is not a valid userAssignedIdentity") # pylint: disable=line-too-long return _validate_platform_workload_identities def validate_cluster_identity(cmd, namespace): if namespace.mi_user_assigned is None: return if not namespace.enable_managed_identity: raise RequiredArgumentMissingError('Must set --enable-managed-identity when providing a cluster identity') # pylint: disable=line-too-long if not is_valid_resource_id(namespace.mi_user_assigned): namespace.mi_user_assigned = identity_name_to_resource_id( cmd, namespace, namespace.mi_user_assigned) if not is_valid_identity_resource_id(namespace.mi_user_assigned): raise InvalidArgumentValueError(f"Resource {namespace.mi_user_assigned} used for cluster user assigned identity is not a valid userAssignedIdentity") # pylint: disable=line-too-long def validate_delete_identities(namespace): if namespace.delete_identities is None: return if namespace.delete_identities and namespace.no_wait: raise MutuallyExclusiveArgumentError('Must not specify --no-wait when --delete-identities is used') def identity_name_to_resource_id(cmd, namespace, name): return resource_id( subscription=get_subscription_id(cmd.cli_ctx), resource_group=namespace.resource_group_name, namespace='Microsoft.ManagedIdentity', type='userAssignedIdentities', name=name, ) def is_valid_identity_resource_id(rid): parsed = parse_resource_id(rid) return parsed['namespace'] == 'Microsoft.ManagedIdentity' and \ parsed['type'] == 'userAssignedIdentities'