in src/azure-cli/azure/cli/command_modules/vm/custom.py [0:0]
def create_vm(cmd, vm_name, resource_group_name, image=None, size='Standard_DS1_v2', location=None, tags=None,
no_wait=False, authentication_type=None, admin_password=None, computer_name=None,
admin_username=None, ssh_dest_key_path=None, ssh_key_value=None, generate_ssh_keys=False,
availability_set=None, nics=None, nsg=None, nsg_rule=None, accelerated_networking=None,
private_ip_address=None, public_ip_address=None, public_ip_address_allocation='dynamic',
public_ip_address_dns_name=None, public_ip_sku=None, os_disk_name=None, os_type=None,
storage_account=None, os_caching=None, data_caching=None, storage_container_name=None, storage_sku=None,
use_unmanaged_disk=False, attach_os_disk=None, os_disk_size_gb=None, attach_data_disks=None,
data_disk_sizes_gb=None, disk_info=None,
vnet_name=None, vnet_address_prefix='10.0.0.0/16', subnet=None, subnet_address_prefix='10.0.0.0/24',
storage_profile=None, os_publisher=None, os_offer=None, os_sku=None, os_version=None,
storage_account_type=None, vnet_type=None, nsg_type=None, public_ip_address_type=None, nic_type=None,
validate=False, custom_data=None, secrets=None, plan_name=None, plan_product=None, plan_publisher=None,
plan_promotion_code=None, license_type=None, assign_identity=None, identity_scope=None,
identity_role=None, identity_role_id=None, encryption_identity=None,
application_security_groups=None, zone=None, boot_diagnostics_storage=None, ultra_ssd_enabled=None,
ephemeral_os_disk=None, ephemeral_os_disk_placement=None,
proximity_placement_group=None, dedicated_host=None, dedicated_host_group=None, aux_subscriptions=None,
priority=None, max_price=None, eviction_policy=None, enable_agent=None, workspace=None, vmss=None,
os_disk_encryption_set=None, data_disk_encryption_sets=None, specialized=None,
encryption_at_host=None, enable_auto_update=None, patch_mode=None, ssh_key_name=None,
enable_hotpatching=None, platform_fault_domain=None, security_type=None, enable_secure_boot=None,
enable_vtpm=None, count=None, edge_zone=None, nic_delete_option=None, os_disk_delete_option=None,
data_disk_delete_option=None, user_data=None, capacity_reservation_group=None, enable_hibernation=None,
v_cpus_available=None, v_cpus_per_core=None, accept_term=None,
disable_integrity_monitoring=None, # Unused
enable_integrity_monitoring=False,
os_disk_security_encryption_type=None, os_disk_secure_vm_disk_encryption_set=None,
disk_controller_type=None, disable_integrity_monitoring_autoupgrade=False, enable_proxy_agent=None,
proxy_agent_mode=None, source_snapshots_or_disks=None, source_snapshots_or_disks_size_gb=None,
source_disk_restore_point=None, source_disk_restore_point_size_gb=None, ssh_key_type=None,
additional_scheduled_events=None, enable_user_reboot_scheduled_events=None,
enable_user_redeploy_scheduled_events=None, zone_placement_policy=None, include_zones=None,
exclude_zones=None, align_regional_disks_to_vm_zone=None, wire_server_mode=None, imds_mode=None,
wire_server_access_control_profile_reference_id=None, imds_access_control_profile_reference_id=None,
key_incarnation_id=None):
from azure.cli.core.commands.client_factory import get_subscription_id
from azure.cli.core.util import random_string, hash_string
from azure.cli.core.commands.arm import ArmTemplateBuilder
from azure.cli.command_modules.vm._template_builder import (build_vm_resource,
build_storage_account_resource, build_nic_resource,
build_vnet_resource, build_nsg_resource,
build_public_ip_resource, StorageProfile,
build_msi_role_assignment,
build_vm_linux_log_analytics_workspace_agent,
build_vm_windows_log_analytics_workspace_agent)
from azure.cli.command_modules.vm._vm_utils import ArmTemplateBuilder20190401
from azure.mgmt.core.tools import resource_id, is_valid_resource_id, parse_resource_id
# In the latest profile, the default public IP will be expected to be changed from Basic to Standard,
# and Basic option will be removed.
# In order to avoid breaking change which has a big impact to users,
# we use the hint to guide users to use Standard public IP to create VM in the first stage.
if cmd.cli_ctx.cloud.profile == 'latest':
if public_ip_sku == "Basic":
logger.warning(remove_basic_option_msg, "--public-ip-sku Standard")
subscription_id = get_subscription_id(cmd.cli_ctx)
if os_disk_encryption_set is not None and not is_valid_resource_id(os_disk_encryption_set):
os_disk_encryption_set = resource_id(
subscription=subscription_id, resource_group=resource_group_name,
namespace='Microsoft.Compute', type='diskEncryptionSets', name=os_disk_encryption_set)
if os_disk_secure_vm_disk_encryption_set is not None and\
not is_valid_resource_id(os_disk_secure_vm_disk_encryption_set):
os_disk_secure_vm_disk_encryption_set = resource_id(
subscription=subscription_id, resource_group=resource_group_name,
namespace='Microsoft.Compute', type='diskEncryptionSets', name=os_disk_secure_vm_disk_encryption_set)
if data_disk_encryption_sets is None:
data_disk_encryption_sets = []
for i, des in enumerate(data_disk_encryption_sets):
if des is not None and not is_valid_resource_id(des):
data_disk_encryption_sets[i] = resource_id(
subscription=subscription_id, resource_group=resource_group_name,
namespace='Microsoft.Compute', type='diskEncryptionSets', name=des)
storage_sku = disk_info['os'].get('storageAccountType')
network_id_template = resource_id(
subscription=subscription_id, resource_group=resource_group_name,
namespace='Microsoft.Network')
vm_id = resource_id(
subscription=subscription_id, resource_group=resource_group_name,
namespace='Microsoft.Compute', type='virtualMachines', name=vm_name)
# determine final defaults and calculated values
tags = tags or {}
os_disk_name = os_disk_name or ('osdisk_{}'.format(hash_string(vm_id, length=10)) if use_unmanaged_disk else None)
storage_container_name = storage_container_name or 'vhds'
# Build up the ARM template
if count is None:
master_template = ArmTemplateBuilder()
else:
master_template = ArmTemplateBuilder20190401()
vm_dependencies = []
if storage_account_type == 'new':
storage_account = storage_account or 'vhdstorage{}'.format(
hash_string(vm_id, length=14, force_lower=True))
vm_dependencies.append('Microsoft.Storage/storageAccounts/{}'.format(storage_account))
master_template.add_resource(build_storage_account_resource(cmd, storage_account, location,
tags, storage_sku, edge_zone))
nic_name = None
if nic_type == 'new':
nic_name = '{}VMNic'.format(vm_name)
nic_full_name = 'Microsoft.Network/networkInterfaces/{}'.format(nic_name)
if count:
vm_dependencies.extend([nic_full_name + str(i) for i in range(count)])
else:
vm_dependencies.append(nic_full_name)
nic_dependencies = []
if vnet_type == 'new':
subnet = subnet or '{}Subnet'.format(vm_name)
vnet_exists = False
if vnet_name:
from azure.cli.command_modules.vm._vm_utils import check_existence
vnet_exists = \
check_existence(cmd.cli_ctx, vnet_name, resource_group_name, 'Microsoft.Network', 'virtualNetworks')
if vnet_exists:
SubnetCreate = import_aaz_by_profile(cmd.cli_ctx.cloud.profile, "network.vnet.subnet").Create
try:
poller = SubnetCreate(cli_ctx=cmd.cli_ctx)(command_args={
'name': subnet,
'vnet_name': vnet_name,
'resource_group': resource_group_name,
'address_prefixes': [subnet_address_prefix],
'address_prefix': subnet_address_prefix
})
LongRunningOperation(cmd.cli_ctx)(poller)
except Exception:
raise CLIError('Subnet({}) does not exist, but failed to create a new subnet with address '
'prefix {}. It may be caused by name or address prefix conflict. Please specify '
'an appropriate subnet name with --subnet or a valid address prefix value with '
'--subnet-address-prefix.'.format(subnet, subnet_address_prefix))
if not vnet_exists:
vnet_name = vnet_name or '{}VNET'.format(vm_name)
nic_dependencies.append('Microsoft.Network/virtualNetworks/{}'.format(vnet_name))
master_template.add_resource(build_vnet_resource(cmd, vnet_name, location, tags, vnet_address_prefix,
subnet, subnet_address_prefix, edge_zone=edge_zone))
if nsg_type == 'new':
if nsg_rule is None:
nsg_rule = 'RDP' if os_type.lower() == 'windows' else 'SSH'
nsg = nsg or '{}NSG'.format(vm_name)
nic_dependencies.append('Microsoft.Network/networkSecurityGroups/{}'.format(nsg))
master_template.add_resource(build_nsg_resource(cmd, nsg, location, tags, nsg_rule))
if public_ip_address_type == 'new':
public_ip_address = public_ip_address or '{}PublicIP'.format(vm_name)
public_ip_address_full_name = 'Microsoft.Network/publicIpAddresses/{}'.format(public_ip_address)
if count:
nic_dependencies.extend([public_ip_address_full_name + str(i) for i in range(count)])
else:
nic_dependencies.append(public_ip_address_full_name)
master_template.add_resource(build_public_ip_resource(cmd, public_ip_address, location, tags,
public_ip_address_allocation,
public_ip_address_dns_name,
public_ip_sku, zone, count, edge_zone))
subnet_id = subnet if is_valid_resource_id(subnet) else \
'{}/virtualNetworks/{}/subnets/{}'.format(network_id_template, vnet_name, subnet)
nsg_id = None
if nsg:
nsg_id = nsg if is_valid_resource_id(nsg) else \
'{}/networkSecurityGroups/{}'.format(network_id_template, nsg)
public_ip_address_id = None
if public_ip_address:
public_ip_address_id = public_ip_address if is_valid_resource_id(public_ip_address) \
else '{}/publicIPAddresses/{}'.format(network_id_template, public_ip_address)
nics_id = '{}/networkInterfaces/{}'.format(network_id_template, nic_name)
if count:
nics = [
{
'id': "[concat('{}', copyIndex())]".format(nics_id),
'properties': {
'deleteOption': nic_delete_option
}
}
]
else:
nics = [
{
'id': nics_id,
'properties': {
'deleteOption': nic_delete_option
}
}
]
nic_resource = build_nic_resource(
cmd, nic_name, location, tags, vm_name, subnet_id, private_ip_address, nsg_id,
public_ip_address_id, application_security_groups, accelerated_networking=accelerated_networking,
count=count, edge_zone=edge_zone)
nic_resource['dependsOn'] = nic_dependencies
master_template.add_resource(nic_resource)
else:
# Using an existing NIC
invalid_parameters = [nsg, public_ip_address, subnet, vnet_name, application_security_groups]
if any(invalid_parameters):
raise CLIError('When specifying an existing NIC, do not specify NSG, '
'public IP, ASGs, VNet or subnet.')
if accelerated_networking is not None:
logger.warning('When specifying an existing NIC, do not specify accelerated networking. '
'Ignore --accelerated-networking now. '
'This will trigger an error instead of a warning in future releases.')
os_vhd_uri = None
if storage_profile in [StorageProfile.SACustomImage, StorageProfile.SAPirImage]:
storage_account_name = storage_account.rsplit('/', 1)
storage_account_name = storage_account_name[1] if \
len(storage_account_name) > 1 else storage_account_name[0]
os_vhd_uri = 'https://{}.blob.{}/{}/{}.vhd'.format(
storage_account_name, cmd.cli_ctx.cloud.suffixes.storage_endpoint, storage_container_name, os_disk_name)
elif storage_profile == StorageProfile.SASpecializedOSDisk:
os_vhd_uri = attach_os_disk
os_disk_name = attach_os_disk.rsplit('/', 1)[1][:-4]
if custom_data:
custom_data = read_content_if_is_file(custom_data)
if user_data:
user_data = read_content_if_is_file(user_data)
if secrets:
secrets = _merge_secrets([validate_file_or_dict(secret) for secret in secrets])
vm_resource = build_vm_resource(
cmd=cmd, name=vm_name, location=location, tags=tags, size=size, storage_profile=storage_profile, nics=nics,
admin_username=admin_username, availability_set_id=availability_set, admin_password=admin_password,
ssh_key_values=ssh_key_value, ssh_key_path=ssh_dest_key_path, image_reference=image,
os_disk_name=os_disk_name, custom_image_os_type=os_type, authentication_type=authentication_type,
os_publisher=os_publisher, os_offer=os_offer, os_sku=os_sku, os_version=os_version, os_vhd_uri=os_vhd_uri,
attach_os_disk=attach_os_disk, os_disk_size_gb=os_disk_size_gb, custom_data=custom_data, secrets=secrets,
license_type=license_type, zone=zone, disk_info=disk_info,
boot_diagnostics_storage_uri=boot_diagnostics_storage, ultra_ssd_enabled=ultra_ssd_enabled,
proximity_placement_group=proximity_placement_group, computer_name=computer_name,
dedicated_host=dedicated_host, priority=priority, max_price=max_price, eviction_policy=eviction_policy,
enable_agent=enable_agent, vmss=vmss, os_disk_encryption_set=os_disk_encryption_set,
data_disk_encryption_sets=data_disk_encryption_sets, specialized=specialized,
encryption_at_host=encryption_at_host, dedicated_host_group=dedicated_host_group,
enable_auto_update=enable_auto_update, patch_mode=patch_mode, enable_hotpatching=enable_hotpatching,
platform_fault_domain=platform_fault_domain, security_type=security_type, enable_secure_boot=enable_secure_boot,
enable_vtpm=enable_vtpm, count=count, edge_zone=edge_zone, os_disk_delete_option=os_disk_delete_option,
user_data=user_data, capacity_reservation_group=capacity_reservation_group,
enable_hibernation=enable_hibernation, v_cpus_available=v_cpus_available, v_cpus_per_core=v_cpus_per_core,
os_disk_security_encryption_type=os_disk_security_encryption_type,
os_disk_secure_vm_disk_encryption_set=os_disk_secure_vm_disk_encryption_set,
disk_controller_type=disk_controller_type, enable_proxy_agent=enable_proxy_agent,
proxy_agent_mode=proxy_agent_mode, additional_scheduled_events=additional_scheduled_events,
enable_user_reboot_scheduled_events=enable_user_reboot_scheduled_events,
enable_user_redeploy_scheduled_events=enable_user_redeploy_scheduled_events,
zone_placement_policy=zone_placement_policy, include_zones=include_zones, exclude_zones=exclude_zones,
align_regional_disks_to_vm_zone=align_regional_disks_to_vm_zone, wire_server_mode=wire_server_mode,
imds_mode=imds_mode,
wire_server_access_control_profile_reference_id=wire_server_access_control_profile_reference_id,
imds_access_control_profile_reference_id=imds_access_control_profile_reference_id,
key_incarnation_id=key_incarnation_id)
vm_resource['dependsOn'] = vm_dependencies
if plan_name:
vm_resource['plan'] = {
'name': plan_name,
'publisher': plan_publisher,
'product': plan_product,
'promotionCode': plan_promotion_code
}
enable_local_identity = None
if assign_identity is not None:
vm_resource['identity'], _, _, enable_local_identity = _build_identities_info(assign_identity)
role_assignment_guid = None
if identity_scope:
role_assignment_guid = str(_gen_guid())
master_template.add_resource(build_msi_role_assignment(vm_name, vm_id, identity_role_id,
role_assignment_guid, identity_scope))
if encryption_identity:
if not cmd.supported_api_version(min_api='2023-09-01', resource_type=ResourceType.MGMT_COMPUTE):
raise CLIError("Usage error: Encryption Identity required API version 2023-09-01 or higher."
"You can set the cloud's profile to use the required API Version with:"
"az cloud set --profile latest --name <cloud name>")
if 'identity' in vm_resource and 'userAssignedIdentities' in vm_resource['identity'] \
and encryption_identity.lower() in \
(k.lower() for k in vm_resource['identity']['userAssignedIdentities'].keys()):
if 'securityProfile' not in vm_resource['properties']:
vm_resource['properties']['securityProfile'] = {}
if 'encryptionIdentity' not in vm_resource['properties']['securityProfile']:
vm_resource['properties']['securityProfile']['encryptionIdentity'] = {}
vm_securityProfile_EncryptionIdentity = vm_resource['properties']['securityProfile']['encryptionIdentity']
if 'userAssignedIdentityResourceId' not in vm_securityProfile_EncryptionIdentity or \
vm_securityProfile_EncryptionIdentity['userAssignedIdentityResourceId'] != encryption_identity:
vm_resource['properties']['securityProfile']['encryptionIdentity']['userAssignedIdentityResourceId'] \
= encryption_identity
else:
raise CLIError("Encryption Identity should be an ARM Resource ID of one of the "
"user assigned identities associated to the resource")
if workspace is not None:
workspace_id = _prepare_workspace(cmd, resource_group_name, workspace)
master_template.add_secure_parameter('workspaceId', workspace_id)
if os_type.lower() == 'linux':
vm_mmaExtension_resource = build_vm_linux_log_analytics_workspace_agent(cmd, vm_name, location)
master_template.add_resource(vm_mmaExtension_resource)
elif os_type.lower() == 'windows':
vm_mmaExtension_resource = build_vm_windows_log_analytics_workspace_agent(cmd, vm_name, location)
master_template.add_resource(vm_mmaExtension_resource)
else:
logger.warning("Unsupported OS type. Skip the connection step for log analytics workspace.")
master_template.add_resource(vm_resource)
if admin_password:
master_template.add_secure_parameter('adminPassword', admin_password)
template = master_template.build()
parameters = master_template.build_parameters()
# deploy ARM template
deployment_name = 'vm_deploy_' + random_string(32)
client = get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_RESOURCE_RESOURCES,
aux_subscriptions=aux_subscriptions).deployments
DeploymentProperties = cmd.get_models('DeploymentProperties', resource_type=ResourceType.MGMT_RESOURCE_RESOURCES)
properties = DeploymentProperties(template=template, parameters=parameters, mode='incremental')
Deployment = cmd.get_models('Deployment', resource_type=ResourceType.MGMT_RESOURCE_RESOURCES)
deployment = Deployment(properties=properties)
if validate:
from azure.cli.command_modules.vm._vm_utils import log_pprint_template
log_pprint_template(template)
log_pprint_template(parameters)
if cmd.supported_api_version(min_api='2019-10-01', resource_type=ResourceType.MGMT_RESOURCE_RESOURCES):
validation_poller = client.begin_validate(resource_group_name, deployment_name, deployment)
return LongRunningOperation(cmd.cli_ctx)(validation_poller)
return client.validate(resource_group_name, deployment_name, deployment)
# creates the VM deployment
if no_wait:
return sdk_no_wait(no_wait, client.begin_create_or_update, resource_group_name, deployment_name, deployment)
LongRunningOperation(cmd.cli_ctx)(client.begin_create_or_update(resource_group_name, deployment_name, deployment))
# Guest Attestation Extension and enable System Assigned MSI by default
is_trusted_launch = security_type and security_type.lower() == 'trustedlaunch' and\
enable_vtpm and enable_secure_boot
is_confidential_vm = security_type and security_type.lower() == 'confidentialvm'
if (is_trusted_launch or is_confidential_vm) and enable_integrity_monitoring:
vm = get_vm(cmd, resource_group_name, vm_name, 'instanceView')
client = _compute_client_factory(cmd.cli_ctx)
if vm.storage_profile.os_disk.os_type == 'Linux':
publisher = 'Microsoft.Azure.Security.LinuxAttestation'
if vm.storage_profile.os_disk.os_type == 'Windows':
publisher = 'Microsoft.Azure.Security.WindowsAttestation'
version = _normalize_extension_version(cmd.cli_ctx, publisher, 'GuestAttestation', None, vm.location)
VirtualMachineExtension = cmd.get_models('VirtualMachineExtension')
ext = VirtualMachineExtension(location=vm.location,
publisher=publisher,
type_properties_type='GuestAttestation',
protected_settings=None,
type_handler_version=version,
settings=None,
auto_upgrade_minor_version=True,
enable_automatic_upgrade=not disable_integrity_monitoring_autoupgrade)
try:
LongRunningOperation(cmd.cli_ctx)(client.virtual_machine_extensions.begin_create_or_update(
resource_group_name, vm_name, 'GuestAttestation', ext))
logger.info('Guest Attestation Extension has been successfully installed by default '
'when Trusted Launch configuration is met')
except Exception as e:
error_type = "Trusted Launch" if is_trusted_launch else "Confidential VM"
logger.error('Failed to install Guest Attestation Extension for %s. %s', error_type, e)
if count:
vm_names = [vm_name + str(i) for i in range(count)]
else:
vm_names = [vm_name]
vms = []
# Use vm_name2 to avoid R1704: Redefining argument with the local name 'vm_name' (redefined-argument-from-local)
for vm_name2 in vm_names:
vm = get_vm_details(cmd, resource_group_name, vm_name2)
if assign_identity is not None:
if enable_local_identity and not identity_scope:
_show_missing_access_warning(resource_group_name, vm_name2, 'vm')
setattr(vm, 'identity', _construct_identity_info(identity_scope, identity_role, vm.identity.principal_id,
vm.identity.user_assigned_identities))
vms.append(vm)
if workspace is not None:
workspace_name = parse_resource_id(workspace_id)['name']
_set_data_source_for_workspace(cmd, os_type, resource_group_name, workspace_name)
if len(vms) == 1:
return vms[0]
return vms