in src/azure-cli/azure/cli/command_modules/vm/_validators.py [0:0]
def _validate_vm_create_storage_profile(cmd, namespace, for_scale_set=False):
from azure.mgmt.core.tools import parse_resource_id
_validate_vm_vmss_create_ephemeral_placement(namespace)
# specialized is only for image
if getattr(namespace, 'specialized', None) is not None and namespace.image is None:
raise CLIError('usage error: --specialized is only configurable when --image is specified.')
# use minimal parameters to resolve the expected storage profile
if getattr(namespace, 'attach_os_disk', None) and not namespace.image:
if namespace.use_unmanaged_disk:
# STORAGE PROFILE #3
namespace.storage_profile = StorageProfile.SASpecializedOSDisk
else:
# STORAGE PROFILE #6
namespace.storage_profile = StorageProfile.ManagedSpecializedOSDisk
elif namespace.image and not getattr(namespace, 'attach_os_disk', None):
image_type = _parse_image_argument(cmd, namespace)
if image_type == 'uri':
# STORAGE PROFILE #2
namespace.storage_profile = StorageProfile.SACustomImage
elif image_type == 'image_id':
# STORAGE PROFILE #5
namespace.storage_profile = StorageProfile.ManagedCustomImage
elif image_type == 'shared_gallery_image_id':
namespace.storage_profile = StorageProfile.SharedGalleryImage
elif image_type == 'community_gallery_image_id':
namespace.storage_profile = StorageProfile.CommunityGalleryImage
elif image_type == 'urn':
if namespace.use_unmanaged_disk:
# STORAGE PROFILE #1
namespace.storage_profile = StorageProfile.SAPirImage
else:
# STORAGE PROFILE #4
namespace.storage_profile = StorageProfile.ManagedPirImage
else:
raise CLIError('Unrecognized image type: {}'.format(image_type))
elif not namespace.image and not getattr(namespace, 'attach_os_disk', None):
namespace.image = 'MicrosoftWindowsServer:WindowsServer:2022-datacenter-azure-edition:latest'
_parse_image_argument(cmd, namespace)
namespace.storage_profile = StorageProfile.ManagedPirImage
if namespace.enable_secure_boot is None:
namespace.enable_secure_boot = True
if namespace.enable_vtpm is None:
namespace.enable_vtpm = True
if namespace.security_type is None:
namespace.security_type = 'TrustedLaunch'
else:
# did not specify image XOR attach-os-disk
raise CLIError('incorrect usage: --image IMAGE | --attach-os-disk DISK')
auth_params = ['admin_password', 'admin_username', 'authentication_type',
'generate_ssh_keys', 'ssh_dest_key_path', 'ssh_key_value']
# perform parameter validation for the specific storage profile
# start with the required/forbidden parameters for VM
if namespace.storage_profile == StorageProfile.ManagedPirImage:
required = ['image']
forbidden = ['os_type', 'attach_os_disk', 'storage_account',
'storage_container_name', 'use_unmanaged_disk']
if for_scale_set:
forbidden.append('os_disk_name')
elif namespace.storage_profile == StorageProfile.ManagedCustomImage:
required = ['image']
forbidden = ['os_type', 'attach_os_disk', 'storage_account',
'storage_container_name', 'use_unmanaged_disk']
if for_scale_set:
forbidden.append('os_disk_name')
elif namespace.storage_profile == StorageProfile.SharedGalleryImage:
required = ['image']
forbidden = ['attach_os_disk', 'storage_account', 'storage_container_name', 'use_unmanaged_disk']
elif namespace.storage_profile == StorageProfile.CommunityGalleryImage:
required = ['image']
forbidden = ['attach_os_disk', 'storage_account', 'storage_container_name', 'use_unmanaged_disk']
elif namespace.storage_profile == StorageProfile.ManagedSpecializedOSDisk:
required = ['os_type', 'attach_os_disk']
forbidden = ['os_disk_name', 'os_caching', 'storage_account', 'ephemeral_os_disk',
'storage_container_name', 'use_unmanaged_disk', 'storage_sku'] + auth_params
elif namespace.storage_profile == StorageProfile.SAPirImage:
required = ['image', 'use_unmanaged_disk']
forbidden = ['os_type', 'attach_os_disk', 'data_disk_sizes_gb', 'ephemeral_os_disk']
elif namespace.storage_profile == StorageProfile.SACustomImage:
required = ['image', 'os_type', 'use_unmanaged_disk']
forbidden = ['attach_os_disk', 'data_disk_sizes_gb', 'ephemeral_os_disk']
elif namespace.storage_profile == StorageProfile.SASpecializedOSDisk:
required = ['os_type', 'attach_os_disk', 'use_unmanaged_disk']
forbidden = ['os_disk_name', 'os_caching', 'image', 'storage_account', 'ephemeral_os_disk',
'storage_container_name', 'data_disk_sizes_gb', 'storage_sku'] + auth_params
else:
raise CLIError('Unrecognized storage profile: {}'.format(namespace.storage_profile))
logger.debug("storage profile '%s'", namespace.storage_profile)
if for_scale_set:
# VMSS lacks some parameters, so scrub these out
props_to_remove = ['attach_os_disk', 'storage_account']
for prop in props_to_remove:
if prop in required:
required.remove(prop)
if prop in forbidden:
forbidden.remove(prop)
# set default storage SKU if not provided and using an image based OS
if not namespace.storage_sku and namespace.storage_profile in [StorageProfile.SAPirImage, StorageProfile.SACustomImage]: # pylint: disable=line-too-long
namespace.storage_sku = ['Standard_LRS'] if for_scale_set else ['Premium_LRS']
if namespace.ultra_ssd_enabled is None and namespace.storage_sku:
for sku in namespace.storage_sku:
if 'ultrassd_lrs' in sku.lower():
namespace.ultra_ssd_enabled = True
# Now verify the presence of required and absence of forbidden parameters
validate_parameter_set(
namespace, required, forbidden,
description='storage profile: {}:'.format(_get_storage_profile_description(namespace.storage_profile)))
image_data_disks = []
if namespace.storage_profile == StorageProfile.ManagedCustomImage:
# extract additional information from a managed custom image
res = parse_resource_id(namespace.image)
namespace.aux_subscriptions = [res['subscription']]
compute_client = _compute_client_factory(cmd.cli_ctx, subscription_id=res['subscription'])
if res['type'].lower() == 'images':
image_info = compute_client.images.get(res['resource_group'], res['name'])
namespace.os_type = image_info.storage_profile.os_disk.os_type
image_data_disks = image_info.storage_profile.data_disks or []
image_data_disks = [{'lun': disk.lun} for disk in image_data_disks]
elif res['type'].lower() == 'galleries':
image_info = compute_client.gallery_images.get(resource_group_name=res['resource_group'],
gallery_name=res['name'],
gallery_image_name=res['child_name_1'])
namespace.os_type = image_info.os_type
gallery_image_version = res.get('child_name_2', '')
if gallery_image_version.lower() in ['latest', '']:
from .aaz.latest.sig.image_version import List as _SigImageVersionList
image_version_infos = _SigImageVersionList(cli_ctx=cmd.cli_ctx)(command_args={
"resource_group": res['resource_group'],
"gallery_name": res['name'],
"gallery_image_definition": res['child_name_1'],
"subscription": res['subscription']
})
image_version_infos = [x for x in image_version_infos
if not x.get("publishingProfile", {}).get("excludeFromLatest", None)]
if not image_version_infos:
raise CLIError('There is no latest image version exists for "{}"'.format(namespace.image))
image_version_info = sorted(image_version_infos,
key=lambda x: x["publishingProfile"]["publishedDate"])[-1]
image_data_disks = image_version_info.get("storageProfile", {}).get("dataDiskImages", []) or []
image_data_disks = [{'lun': disk["lun"]} for disk in image_data_disks]
else:
from .aaz.latest.sig.image_version import Show as _SigImageVersionShow
image_version_info = _SigImageVersionShow(cli_ctx=cmd.cli_ctx)(command_args={
"resource_group": res['resource_group'],
"gallery_name": res['name'],
"gallery_image_definition": res['child_name_1'],
"gallery_image_version_name": res['child_name_2'],
"subscription": res['subscription']
})
image_data_disks = image_version_info.get("storageProfile", {}).get("dataDiskImages", []) or []
image_data_disks = [{'lun': disk["lun"]} for disk in image_data_disks]
else:
raise CLIError('usage error: unrecognized image information "{}"'.format(namespace.image))
# pylint: disable=no-member
elif namespace.storage_profile == StorageProfile.ManagedSpecializedOSDisk:
# accept disk name or ID
namespace.attach_os_disk = _get_resource_id(
cmd.cli_ctx, namespace.attach_os_disk, namespace.resource_group_name, 'disks', 'Microsoft.Compute')
if getattr(namespace, 'attach_data_disks', None):
if not namespace.use_unmanaged_disk:
namespace.attach_data_disks = [_get_resource_id(cmd.cli_ctx, d, namespace.resource_group_name, 'disks',
'Microsoft.Compute') for d in namespace.attach_data_disks]
if namespace.storage_profile == StorageProfile.SharedGalleryImage:
if namespace.location is None:
raise RequiredArgumentMissingError(
'Please input the location of the shared gallery image through the parameter --location.')
from ._vm_utils import parse_shared_gallery_image_id
image_info = parse_shared_gallery_image_id(namespace.image)
from ._client_factory import cf_shared_gallery_image
shared_gallery_image_info = cf_shared_gallery_image(cmd.cli_ctx).get(
location=namespace.location, gallery_unique_name=image_info[0], gallery_image_name=image_info[1])
if namespace.os_type and namespace.os_type.lower() != shared_gallery_image_info.os_type.lower():
raise ArgumentUsageError("The --os-type is not the correct os type of this shared gallery image, "
"the os type of this image should be {}".format(shared_gallery_image_info.os_type))
namespace.os_type = shared_gallery_image_info.os_type
if namespace.storage_profile == StorageProfile.CommunityGalleryImage:
if namespace.location is None:
raise RequiredArgumentMissingError(
'Please input the location of the community gallery image through the parameter --location.')
from ._vm_utils import parse_community_gallery_image_id
image_info = parse_community_gallery_image_id(namespace.image)
from ._client_factory import cf_community_gallery_image
community_gallery_image_info = cf_community_gallery_image(cmd.cli_ctx).get(
location=namespace.location, public_gallery_name=image_info[0], gallery_image_name=image_info[1])
if namespace.os_type and namespace.os_type.lower() != community_gallery_image_info.os_type.lower():
raise ArgumentUsageError(
"The --os-type is not the correct os type of this community gallery image, "
"the os type of this image should be {}".format(community_gallery_image_info.os_type))
namespace.os_type = community_gallery_image_info.os_type
if getattr(namespace, 'security_type', None) == 'ConfidentialVM' and \
not getattr(namespace, 'os_disk_security_encryption_type', None):
raise RequiredArgumentMissingError('usage error: "--os-disk-security-encryption-type" is required '
'when "--security-type" is specified as "ConfidentialVM"')
if getattr(namespace, 'os_disk_secure_vm_disk_encryption_set', None) and \
getattr(namespace, 'os_disk_security_encryption_type', None) != 'DiskWithVMGuestState':
raise ArgumentUsageError(
'usage error: The "--os-disk-secure-vm-disk-encryption-set" can only be passed in '
'when "--os-disk-security-encryption-type" is "DiskWithVMGuestState"')
os_disk_security_encryption_type = getattr(namespace, 'os_disk_security_encryption_type', None)
if os_disk_security_encryption_type and os_disk_security_encryption_type.lower() == 'nonpersistedtpm':
if ((getattr(namespace, 'security_type', None) != 'ConfidentialVM') or
not getattr(namespace, 'enable_vtpm', None)):
raise ArgumentUsageError(
'usage error: The "--os-disk-security-encryption-type NonPersistedTPM" can only be passed in '
'when "--security-type" is "ConfidentialVM" and "--enable-vtpm" is True')
if not namespace.os_type:
namespace.os_type = 'windows' if 'windows' in namespace.os_offer.lower() else 'linux'
if getattr(namespace, 'source_snapshots_or_disks', None) and \
getattr(namespace, 'source_snapshots_or_disks_size_gb', None):
if len(namespace.source_snapshots_or_disks) != len(namespace.source_snapshots_or_disks_size_gb):
raise ArgumentUsageError(
'Length of --source-snapshots-or-disks, --source-snapshots-or-disks-size-gb must be same.')
elif getattr(namespace, 'source_snapshots_or_disks', None) or \
getattr(namespace, 'source_snapshots_or_disks_size_gb', None):
raise ArgumentUsageError('usage error: --source-snapshots-or-disks and '
'--source-snapshots-or-disks-size-gb must be used together')
if getattr(namespace, 'source_disk_restore_point', None) and \
getattr(namespace, 'source_disk_restore_point_size_gb', None):
if len(namespace.source_disk_restore_point) != len(namespace.source_disk_restore_point_size_gb):
raise ArgumentUsageError(
'Length of --source-disk-restore-point, --source-disk-restore-point-size-gb must be same.')
elif getattr(namespace, 'source_disk_restore_point', None) or \
getattr(namespace, 'source_disk_restore_point_size_gb', None):
raise ArgumentUsageError('usage error: --source-disk-restore-point and '
'--source-disk-restore-point-size-gb must be used together')
from ._vm_utils import normalize_disk_info
# attach_data_disks are not exposed yet for VMSS, so use 'getattr' to avoid crash
vm_size = (getattr(namespace, 'size', None) or getattr(namespace, 'vm_sku', None))
# pylint: disable=line-too-long
namespace.disk_info = normalize_disk_info(size=vm_size,
image_data_disks=image_data_disks,
data_disk_sizes_gb=namespace.data_disk_sizes_gb,
attach_data_disks=getattr(namespace, 'attach_data_disks', []),
storage_sku=namespace.storage_sku,
os_disk_caching=namespace.os_caching,
data_disk_cachings=namespace.data_caching,
ephemeral_os_disk=getattr(namespace, 'ephemeral_os_disk', None),
ephemeral_os_disk_placement=getattr(namespace, 'ephemeral_os_disk_placement', None),
data_disk_delete_option=getattr(
namespace, 'data_disk_delete_option', None),
source_snapshots_or_disks=getattr(namespace, 'source_snapshots_or_disks', None),
source_snapshots_or_disks_size_gb=getattr(namespace, 'source_snapshots_or_disks_size_gb', None),
source_disk_restore_point=getattr(namespace, 'source_disk_restore_point', None),
source_disk_restore_point_size_gb=getattr(namespace, 'source_disk_restore_point_size_gb', None)
)