in src/vm-repair/azext_vm_repair/custom.py [0:0]
def create(cmd, vm_name, resource_group_name, repair_password=None, repair_username=None, repair_vm_name=None, copy_disk_name=None, repair_group_name=None, unlock_encrypted_vm=False, enable_nested=False, associate_public_ip=False, distro='ubuntu', yes=False, encrypt_recovery_key="", disable_trusted_launch=False, os_disk_type=None):
"""
This function creates a repair VM.
Parameters:
- cmd: The command to be executed.
- vm_name: The name of the virtual machine.
- resource_group_name: The name of the resource group.
- repair_password: The password for the repair VM. If not provided, a default will be used.
- repair_username: The username for the repair VM. If not provided, a default will be used.
- repair_vm_name: The name of the repair VM. If not provided, a default will be used.
- copy_disk_name: The name of the disk to be copied. If not provided, the OS disk of the source VM will be used.
- repair_group_name: The name of the repair group. If not provided, a default will be used.
- unlock_encrypted_vm: If True, the encrypted VM will be unlocked. Default is False.
- enable_nested: If True, nested virtualization will be enabled. Default is False.
- associate_public_ip: If True, a public IP will be associated with the VM. Default is False.
- distro: The Linux distribution to use for the repair VM. Default is 'ubuntu'.
- yes: If True, confirmation prompts will be skipped. Default is False.
- encrypt_recovery_key: The Bitlocker recovery key to use for encrypting the VM. Default is an empty string.
- disable_trusted_launch: A flag parameter that, when used, sets the security type of the repair VM to Standard.
- os_disk_type: Set the OS disk storage account type of the repair VM to the specified type. The default is PremiumSSD_LRS.
"""
# Logging all the command parameters, except the sensitive data.
# Mask sensitive information
masked_repair_password = '****' if repair_password else None
masked_repair_username = '****' if repair_username else None
masked_repair_encrypt_recovery_key = '****' if encrypt_recovery_key else None
logger.debug('vm repair create command parameters: vm_name: %s, resource_group_name: %s, repair_password: %s, repair_username: %s, repair_vm_name: %s, copy_disk_name: %s, repair_group_name: %s, unlock_encrypted_vm: %s, enable_nested: %s, associate_public_ip: %s, distro: %s, yes: %s, encrypt_recovery_key: %s, disable_trusted_launch: %s, os_disk_type: %s',
vm_name, resource_group_name, masked_repair_password, masked_repair_username, repair_vm_name, copy_disk_name, repair_group_name, unlock_encrypted_vm, enable_nested, associate_public_ip, distro, yes, masked_repair_encrypt_recovery_key, disable_trusted_launch, os_disk_type)
# Initializing a command helper object.
command = command_helper(logger, cmd, 'vm repair create')
# The main command execution block.
try:
# Set parameters used in exception handling to avoid Unbound errors:
existing_rg = None
copy_disk_id = None
# Fetching the data of the source VM.
source_vm = get_vm(cmd, resource_group_name, vm_name)
source_vm_instance_view = get_vm(cmd, resource_group_name, vm_name, 'instanceView')
# Checking if the OS of the source VM is Linux and what the Hyper-V generation is.
is_linux = _is_linux_os(source_vm)
vm_hypervgen = _is_gen2(source_vm_instance_view)
# Fetching the name of the OS disk and checking if it's managed.
target_disk_name = source_vm.storage_profile.os_disk.name
is_managed = _uses_managed_disk(source_vm)
# Fetching the tag for the repair resource and initializing the list of created resources.
resource_tag = _get_repair_resource_tag(resource_group_name, vm_name)
created_resources = []
# Fetching the architecture of the source VM.
architecture_type = _fetch_architecture(source_vm)
# Checking if the source VM's OS is Linux and if it uses a managed disk.
if is_linux and _uses_managed_disk(source_vm):
# Setting the OS type to 'Linux'.
os_type = 'Linux'
# Checking the Hyper-V generation of the source VM.
hyperV_generation_linux = _check_linux_hyperV_gen(source_vm)
if hyperV_generation_linux == 'V2':
# If the Hyper-V generation is 'V2', it may be ARM:
if architecture_type == 'Arm64':
# If the architecture type is 'Arm64', log this information and select the Linux distribution for an Arm64 VM.
logger.info('ARM64 VM detected')
os_image_urn = _select_distro_linux_Arm64(distro)
# Trusted launch is not supported on ARM
logger.info('Disabling trusted launch on ARM')
disable_trusted_launch = True
else:
# log this information and select the Linux distribution for an x86 Gen2 VM.
logger.info('Generation 2 VM detected')
os_image_urn = _select_distro_linux_gen2(distro)
else:
# If the architecture type is not 'V2', select a Gen1 VM
os_image_urn = _select_distro_linux(distro)
else:
# If the source VM's OS is not Linux, check if a recovery key is provided.
if encrypt_recovery_key:
# If a recovery key is provided, fetch the compatible Windows OS URN for a VM with Bitlocker encryption.
os_image_urn = _fetch_compatible_windows_os_urn_v2(source_vm)
else:
# If no recovery key is provided, fetch the compatible Windows OS URN for a regular VM.
os_image_urn = _fetch_compatible_windows_os_urn(source_vm)
# Setting the OS type to 'Windows'.
os_type = 'Windows'
# Set public IP address for repair VM
public_ip_name = _make_public_ip_name(repair_vm_name, associate_public_ip)
# Set up base create vm command
if is_linux:
create_repair_vm_command = 'az vm create -g {g} -n {n} --tag {tag} --image {image} --admin-username {username} --admin-password {password} --public-ip-address {option} --custom-data {cloud_init_script}' \
.format(g=repair_group_name, n=repair_vm_name, tag=resource_tag, image=os_image_urn, username=repair_username, password=repair_password, option=public_ip_name, cloud_init_script=_get_cloud_init_script())
else:
create_repair_vm_command = 'az vm create -g {g} -n {n} --tag {tag} --image {image} --admin-username {username} --admin-password {password} --public-ip-address {option}' \
.format(g=repair_group_name, n=repair_vm_name, tag=resource_tag, image=os_image_urn, username=repair_username, password=repair_password, option=public_ip_name)
# Fetching the size of the repair VM.
sku = _fetch_compatible_sku(source_vm, enable_nested)
if not sku:
# If no compatible size is found, raise an error.
raise SkuNotAvailableError('Failed to find compatible VM size for source VM\'s OS disk within given region and subscription.')
# Adding the size to the command.
create_repair_vm_command += ' --size {sku}'.format(sku=sku)
# Setting the availability zone for the repair VM.
# If the source VM has availability zones, the first one is chosen for the repair VM.
if source_vm.zones:
zone = source_vm.zones[0]
create_repair_vm_command += ' --zone {zone}'.format(zone=zone)
if disable_trusted_launch:
logger.debug('Set security-type to Standard...')
create_repair_vm_command += ' --security-type Standard'
else:
# If a Bitlocker recovery key is provided, this indicates the source VM is encrypted.
# In this case, the VM and OS disk security profiles need to be fetched and added to the repair VM creation command.
if encrypt_recovery_key:
# TODO: this was assumed to also need for Trusted Launch VMs, but I don't think this is the case.
# For confidential VM, some SKUs expect specific security types, secure_boot_enabled and vtpm_enabled.
# Fetching the VM security profile and adding it to the command if it exists.
logger.debug('Fetching VM security profile...')
vm_security_params = _fetch_vm_security_profile_parameters(source_vm)
if vm_security_params:
create_repair_vm_command += vm_security_params
if encrypt_recovery_key:
# TODO: this was assumed to also need for Trusted Launch VMs, but I don't think this is the case.
# For confidential VM and Trusted Launch VM security tags on disks, the disk security profile needs to be brought over as well.
# Fetching the OS Disk security profile and adding it to the command if it exists.
logger.debug('Fetching OS Disk security profile...')
osdisk_security_params = _fetch_osdisk_security_profile_parameters(source_vm)
if osdisk_security_params:
create_repair_vm_command += osdisk_security_params
# Creating a new resource group for the repair VM and its resources.
# First, check if the repair group already exists.
# If it doesn't, create a new resource group at the same location as the source VM.
existing_rg = _check_existing_rg(repair_group_name)
if not existing_rg:
create_resource_group_command = 'az group create -l {loc} -n {group_name}' \
.format(loc=source_vm.location, group_name=repair_group_name)
logger.info('Creating resource group for repair VM and its resources...')
_call_az_command(create_resource_group_command)
# Check if user is changing the Repair VM os disk type
if os_disk_type:
create_repair_vm_command += ' --storage-sku {os_disk_type} '.format(os_disk_type=os_disk_type)
# Check if the source VM uses managed disks.
# If it does, the repair VM will also be created with managed disks.
if is_managed:
logger.info('Source VM uses managed disks. Creating repair VM with managed disks.\n')
# Fetch the SKU, location, OS type, and Hyper-V generation of the disk from the source VM.
disk_sku, location, os_type, hyperV_generation = _fetch_disk_info(resource_group_name, target_disk_name)
# Prepare the command to create a copy of the source VM's OS disk.
# The command includes the resource group name, copy disk name, target disk name, SKU, location, and OS type.
copy_disk_command = 'az disk create -g {g} -n {n} --source {s} --sku {sku} --location {loc} --os-type {os_type} --query id -o tsv' \
.format(g=resource_group_name, n=copy_disk_name, s=target_disk_name, sku=disk_sku, loc=location, os_type=os_type)
# If the Hyper-V generation for the disk is available, append it to the copy disk command.
if hyperV_generation:
copy_disk_command += ' --hyper-v-generation {hyperV}'.format(hyperV=hyperV_generation)
# If the source VM is a Linux Gen2 VM but the Hyper-V generation is not available in the disk info,
# log this situation and manually add 'V2' to the copy disk command.
elif is_linux and hyperV_generation_linux == 'V2':
logger.info('The disk did not contain the information of gen2, but the machine is created from gen2 image')
copy_disk_command += ' --hyper-v-generation {hyperV}'.format(hyperV=hyperV_generation_linux)
# If the source VM has availability zones, get the first one and add it to the copy disk command.
if source_vm.zones:
zone = source_vm.zones[0]
copy_disk_command += ' --zone {zone}'.format(zone=zone)
# Execute the command to create a copy of the OS disk of the source VM.
logger.info('Copying OS disk of source VM...')
copy_disk_id = _call_az_command(copy_disk_command).strip('\n')
# Depending on the operating system of the source VM and whether it's encrypted, different steps are taken.
# If the source VM is not a Linux machine, create the repair VM.
# This is the case for Windows VMs, both encrypted and not encrypted.
if not is_linux:
# Call the method to create the repair VM, providing the necessary parameters.
_create_repair_vm(copy_disk_id, create_repair_vm_command, repair_password, repair_username)
# If the source VM is a Windows machine and it is encrypted, unlock the encrypted VM after creation.
if not is_linux and unlock_encrypted_vm:
# Call the method to create the repair VM.
_create_repair_vm(copy_disk_id, create_repair_vm_command, repair_password, repair_username)
# Call the method to unlock the encrypted VM, providing the necessary parameters.
_unlock_encrypted_vm_run(repair_vm_name, repair_group_name, is_linux, encrypt_recovery_key)
# If the source VM is a Linux machine and it is encrypted, create the repair VM and then unlock it.
if is_linux and unlock_encrypted_vm:
# Call the method to create the repair VM.
_create_repair_vm(copy_disk_id, create_repair_vm_command, repair_password, repair_username)
# Call the method to unlock the encrypted VM.
_unlock_encrypted_vm_run(repair_vm_name, repair_group_name, is_linux)
# If the source VM is a Linux machine and it is not encrypted, create the repair VM and then attach the data disk.
# This is done after VM creation to avoid a UUID mismatch causing an incorrect boot.
if is_linux and (not unlock_encrypted_vm):
# Call the method to create the repair VM, with the fix_uuid parameter set to True.
_create_repair_vm(copy_disk_id, create_repair_vm_command, repair_password, repair_username, fix_uuid=True)
logger.info('Attaching copied disk to repair VM as data disk...')
# Set up the command to attach the copied disk to the repair VM.
attach_disk_command = "az vm disk attach -g {g} --name {disk_id} --vm-name {vm_name} ".format(g=repair_group_name, disk_id=copy_disk_id, vm_name=repair_vm_name)
# Execute the command to attach the disk.
_call_az_command(attach_disk_command)
# Check if the source VM uses unmanaged disks.
# If it does, the repair VM will also be created with unmanaged disks.
else:
logger.info('Source VM uses unmanaged disks. Creating repair VM with unmanaged disks.\n')
# Get the URI of the OS disk from the source VM.
os_disk_uri = source_vm.storage_profile.os_disk.vhd.uri
# Create the name of the copy disk by appending '.vhd' to the existing name.
copy_disk_name = copy_disk_name + '.vhd'
# Create a StorageResourceIdentifier object for the storage account associated with the OS disk.
storage_account = StorageResourceIdentifier(cmd.cli_ctx.cloud, os_disk_uri)
# Validate the VM creation command to ensure all parameters are correct before proceeding.
validate_create_vm_command = create_repair_vm_command + ' --validate'
logger.info('Validating VM template before continuing...')
_call_az_command(validate_create_vm_command, secure_params=[repair_password, repair_username])
# Fetch the connection string of the storage account.
get_connection_string_command = 'az storage account show-connection-string -g {g} -n {n} --query connectionString -o tsv' \
.format(g=resource_group_name, n=storage_account.account_name)
logger.debug('Fetching storage account connection string...')
connection_string = _call_az_command(get_connection_string_command).strip('\n')
# Create a snapshot of the unmanaged OS disk.
make_snapshot_command = 'az storage blob snapshot -c {c} -n {n} --connection-string "{con_string}" --query snapshot -o tsv' \
.format(c=storage_account.container, n=storage_account.blob, con_string=connection_string)
logger.info('Creating snapshot of OS disk...')
snapshot_timestamp = _call_az_command(make_snapshot_command, secure_params=[connection_string]).strip('\n')
snapshot_uri = os_disk_uri + '?snapshot={timestamp}'.format(timestamp=snapshot_timestamp)
# Create a copy of the snapshot into an unmanaged disk.
copy_snapshot_command = 'az storage blob copy start -c {c} -b {name} --source-uri {source} --connection-string "{con_string}"' \
.format(c=storage_account.container, name=copy_disk_name, source=snapshot_uri, con_string=connection_string)
logger.info('Creating a copy disk from the snapshot...')
_call_az_command(copy_snapshot_command, secure_params=[connection_string])
# Generate the URI of the copied disk.
copy_disk_id = os_disk_uri.rstrip(storage_account.blob) + copy_disk_name
# Create the repair VM with the copied unmanaged disk.
create_repair_vm_command = create_repair_vm_command + ' --use-unmanaged-disk'
logger.info('Creating repair VM while disk copy is in progress...')
_call_az_command(create_repair_vm_command, secure_params=[repair_password, repair_username])
# Check if the disk copy process is done.
logger.info('Checking if disk copy is done...')
copy_check_command = 'az storage blob show -c {c} -n {name} --connection-string "{con_string}" --query properties.copy.status -o tsv' \
.format(c=storage_account.container, name=copy_disk_name, con_string=connection_string)
copy_result = _call_az_command(copy_check_command, secure_params=[connection_string]).strip('\n')
# If the disk copy process failed, raise an error.
if copy_result != 'success':
raise UnmanagedDiskCopyError('Unmanaged disk copy failed.')
# Attach the copied unmanaged disk to the repair VM.
logger.info('Attaching copied disk to repair VM as data disk...')
attach_disk_command = "az vm unmanaged-disk attach -g {g} -n {disk_name} --vm-name {vm_name} --vhd-uri {uri}" \
.format(g=repair_group_name, disk_name=copy_disk_name, vm_name=repair_vm_name, uri=copy_disk_id)
_call_az_command(attach_disk_command)
# Check if the Nested Hyper-V needs to be enabled.
# If it does, run the script to install Hyper-V and create the nested VM.
if enable_nested:
logger.info("Running Script win-enable-nested-hyperv.ps1 to install HyperV")
# Set up the command to run the script to enable Nested Hyper-V.
run_hyperv_command = "az vm repair run -g {g} -n {name} --run-id win-enable-nested-hyperv --parameters gen={gen}" \
.format(g=repair_group_name, name=repair_vm_name, gen=vm_hypervgen)
# Execute the command to enable Nested Hyper-V.
ret_enable_nested = _call_az_command(run_hyperv_command)
logger.debug("az vm repair run hyperv command returned: %s", ret_enable_nested)
# If the script indicates that a restart is required, restart the repair VM.
if str.find(ret_enable_nested, "SuccessRestartRequired") > -1:
restart_cmd = 'az vm restart -g {rg} -n {vm}'.format(rg=repair_group_name, vm=repair_vm_name)
logger.info("Restarting Repair VM")
restart_ret = _call_az_command(restart_cmd)
logger.debug(restart_ret)
# After the restart, run the script to enable Nested Hyper-V again to create the nested VM.
logger.info("Running win-enable-nested-hyperv.ps1 again to create nested VM")
run_hyperv_command = "az vm repair run -g {g} -n {name} --run-id win-enable-nested-hyperv --parameters gen={gen}" \
.format(g=repair_group_name, name=repair_vm_name, gen=vm_hypervgen)
ret_enable_nested_again = _call_az_command(run_hyperv_command)
logger.debug("stderr: %s", ret_enable_nested_again)
# List all the resources in the repair resource group.
created_resources = _list_resource_ids_in_rg(repair_group_name)
# Set the command status to success.
command.set_status_success()
# Some error happened. Stop command and clean-up resources.
except KeyboardInterrupt:
command.error_stack_trace = traceback.format_exc()
command.error_message = "Command interrupted by user input."
command.message = "Command interrupted by user input. Cleaning up resources."
except AzCommandError as azCommandError:
command.error_stack_trace = traceback.format_exc()
command.error_message = str(azCommandError)
command.message = "Repair create failed. Cleaning up created resources."
except SkuDoesNotSupportHyperV as skuDoesNotSupportHyperV:
command.error_stack_trace = traceback.format_exc()
command.error_message = str(skuDoesNotSupportHyperV)
command.message = "v2 sku does not support nested VM in hyperv. Please run command without --enabled-nested."
except ScriptReturnsError as scriptReturnsError:
command.error_stack_trace = traceback.format_exc()
command.error_message = str(scriptReturnsError)
command.message = "Error returned from script when enabling hyperv."
except SkuNotAvailableError as skuNotAvailableError:
command.error_stack_trace = traceback.format_exc()
command.error_message = str(skuNotAvailableError)
command.message = "Please check if the current subscription can create more VM resources. Cleaning up created resources."
except UnmanagedDiskCopyError as unmanagedDiskCopyError:
command.error_stack_trace = traceback.format_exc()
command.error_message = str(unmanagedDiskCopyError)
command.message = "Repair create failed. Please try again at another time. Cleaning up created resources."
except WindowsOsNotAvailableError:
command.error_stack_trace = traceback.format_exc()
command.error_message = 'Compatible Windows OS image not available.'
command.message = 'A compatible Windows OS image is not available at this time, please check subscription.'
except Exception as exception:
command.error_stack_trace = traceback.format_exc()
command.error_message = str(exception)
command.message = 'An unexpected error occurred. Try running again with the --debug flag to debug.'
finally:
if command.error_stack_trace:
logger.debug(command.error_stack_trace)
# Generate return results depending on command state
if not command.is_status_success():
command.set_status_error()
return_dict = command.init_return_dict()
if existing_rg:
_clean_up_resources(repair_group_name, confirm=True)
else:
_clean_up_resources(repair_group_name, confirm=False)
else:
created_resources.append(copy_disk_id if copy_disk_id is not None else "")
command.message = 'Your repair VM \'{n}\' has been created in the resource group \'{repair_rg}\' with disk \'{d}\' attached as data disk. ' \
'Please use this VM to troubleshoot and repair. Once the repairs are complete use the command ' \
'\'az vm repair restore -n {source_vm} -g {rg} --verbose\' to restore disk to the source VM. ' \
'Note that the copied disk is created within the original resource group \'{rg}\'.' \
.format(n=repair_vm_name, repair_rg=repair_group_name, d=copy_disk_name, rg=resource_group_name, source_vm=vm_name)
return_dict = command.init_return_dict()
# Add additional custom return properties
return_dict['repair_vm_name'] = repair_vm_name
return_dict['copied_disk_name'] = copy_disk_name
return_dict['copied_disk_uri'] = copy_disk_id if copy_disk_id is not None else ""
return_dict['repair_resource_group'] = repair_group_name
return_dict['resource_tag'] = resource_tag
return_dict['created_resources'] = created_resources
logger.info('\n%s\n', command.message)
return return_dict