def create_vm()

in src/azure-cli/azure/cli/command_modules/vm/custom.py [0:0]


def create_vm(cmd, vm_name, resource_group_name, image=None, size='Standard_DS1_v2', location=None, tags=None,
              no_wait=False, authentication_type=None, admin_password=None, computer_name=None,
              admin_username=None, ssh_dest_key_path=None, ssh_key_value=None, generate_ssh_keys=False,
              availability_set=None, nics=None, nsg=None, nsg_rule=None, accelerated_networking=None,
              private_ip_address=None, public_ip_address=None, public_ip_address_allocation='dynamic',
              public_ip_address_dns_name=None, public_ip_sku=None, os_disk_name=None, os_type=None,
              storage_account=None, os_caching=None, data_caching=None, storage_container_name=None, storage_sku=None,
              use_unmanaged_disk=False, attach_os_disk=None, os_disk_size_gb=None, attach_data_disks=None,
              data_disk_sizes_gb=None, disk_info=None,
              vnet_name=None, vnet_address_prefix='10.0.0.0/16', subnet=None, subnet_address_prefix='10.0.0.0/24',
              storage_profile=None, os_publisher=None, os_offer=None, os_sku=None, os_version=None,
              storage_account_type=None, vnet_type=None, nsg_type=None, public_ip_address_type=None, nic_type=None,
              validate=False, custom_data=None, secrets=None, plan_name=None, plan_product=None, plan_publisher=None,
              plan_promotion_code=None, license_type=None, assign_identity=None, identity_scope=None,
              identity_role=None, identity_role_id=None, encryption_identity=None,
              application_security_groups=None, zone=None, boot_diagnostics_storage=None, ultra_ssd_enabled=None,
              ephemeral_os_disk=None, ephemeral_os_disk_placement=None,
              proximity_placement_group=None, dedicated_host=None, dedicated_host_group=None, aux_subscriptions=None,
              priority=None, max_price=None, eviction_policy=None, enable_agent=None, workspace=None, vmss=None,
              os_disk_encryption_set=None, data_disk_encryption_sets=None, specialized=None,
              encryption_at_host=None, enable_auto_update=None, patch_mode=None, ssh_key_name=None,
              enable_hotpatching=None, platform_fault_domain=None, security_type=None, enable_secure_boot=None,
              enable_vtpm=None, count=None, edge_zone=None, nic_delete_option=None, os_disk_delete_option=None,
              data_disk_delete_option=None, user_data=None, capacity_reservation_group=None, enable_hibernation=None,
              v_cpus_available=None, v_cpus_per_core=None, accept_term=None,
              disable_integrity_monitoring=None,  # Unused
              enable_integrity_monitoring=False,
              os_disk_security_encryption_type=None, os_disk_secure_vm_disk_encryption_set=None,
              disk_controller_type=None, disable_integrity_monitoring_autoupgrade=False, enable_proxy_agent=None,
              proxy_agent_mode=None, source_snapshots_or_disks=None, source_snapshots_or_disks_size_gb=None,
              source_disk_restore_point=None, source_disk_restore_point_size_gb=None, ssh_key_type=None,
              additional_scheduled_events=None, enable_user_reboot_scheduled_events=None,
              enable_user_redeploy_scheduled_events=None, zone_placement_policy=None, include_zones=None,
              exclude_zones=None, align_regional_disks_to_vm_zone=None, wire_server_mode=None, imds_mode=None,
              wire_server_access_control_profile_reference_id=None, imds_access_control_profile_reference_id=None,
              key_incarnation_id=None):

    from azure.cli.core.commands.client_factory import get_subscription_id
    from azure.cli.core.util import random_string, hash_string
    from azure.cli.core.commands.arm import ArmTemplateBuilder
    from azure.cli.command_modules.vm._template_builder import (build_vm_resource,
                                                                build_storage_account_resource, build_nic_resource,
                                                                build_vnet_resource, build_nsg_resource,
                                                                build_public_ip_resource, StorageProfile,
                                                                build_msi_role_assignment,
                                                                build_vm_linux_log_analytics_workspace_agent,
                                                                build_vm_windows_log_analytics_workspace_agent)
    from azure.cli.command_modules.vm._vm_utils import ArmTemplateBuilder20190401
    from azure.mgmt.core.tools import resource_id, is_valid_resource_id, parse_resource_id

    # In the latest profile, the default public IP will be expected to be changed from Basic to Standard,
    # and Basic option will be removed.
    # In order to avoid breaking change which has a big impact to users,
    # we use the hint to guide users to use Standard public IP to create VM in the first stage.
    if cmd.cli_ctx.cloud.profile == 'latest':
        if public_ip_sku == "Basic":
            logger.warning(remove_basic_option_msg, "--public-ip-sku Standard")

    subscription_id = get_subscription_id(cmd.cli_ctx)
    if os_disk_encryption_set is not None and not is_valid_resource_id(os_disk_encryption_set):
        os_disk_encryption_set = resource_id(
            subscription=subscription_id, resource_group=resource_group_name,
            namespace='Microsoft.Compute', type='diskEncryptionSets', name=os_disk_encryption_set)
    if os_disk_secure_vm_disk_encryption_set is not None and\
            not is_valid_resource_id(os_disk_secure_vm_disk_encryption_set):
        os_disk_secure_vm_disk_encryption_set = resource_id(
            subscription=subscription_id, resource_group=resource_group_name,
            namespace='Microsoft.Compute', type='diskEncryptionSets', name=os_disk_secure_vm_disk_encryption_set)

    if data_disk_encryption_sets is None:
        data_disk_encryption_sets = []
    for i, des in enumerate(data_disk_encryption_sets):
        if des is not None and not is_valid_resource_id(des):
            data_disk_encryption_sets[i] = resource_id(
                subscription=subscription_id, resource_group=resource_group_name,
                namespace='Microsoft.Compute', type='diskEncryptionSets', name=des)

    storage_sku = disk_info['os'].get('storageAccountType')

    network_id_template = resource_id(
        subscription=subscription_id, resource_group=resource_group_name,
        namespace='Microsoft.Network')

    vm_id = resource_id(
        subscription=subscription_id, resource_group=resource_group_name,
        namespace='Microsoft.Compute', type='virtualMachines', name=vm_name)

    # determine final defaults and calculated values
    tags = tags or {}
    os_disk_name = os_disk_name or ('osdisk_{}'.format(hash_string(vm_id, length=10)) if use_unmanaged_disk else None)
    storage_container_name = storage_container_name or 'vhds'

    # Build up the ARM template
    if count is None:
        master_template = ArmTemplateBuilder()
    else:
        master_template = ArmTemplateBuilder20190401()

    vm_dependencies = []
    if storage_account_type == 'new':
        storage_account = storage_account or 'vhdstorage{}'.format(
            hash_string(vm_id, length=14, force_lower=True))
        vm_dependencies.append('Microsoft.Storage/storageAccounts/{}'.format(storage_account))
        master_template.add_resource(build_storage_account_resource(cmd, storage_account, location,
                                                                    tags, storage_sku, edge_zone))

    nic_name = None
    if nic_type == 'new':
        nic_name = '{}VMNic'.format(vm_name)
        nic_full_name = 'Microsoft.Network/networkInterfaces/{}'.format(nic_name)
        if count:
            vm_dependencies.extend([nic_full_name + str(i) for i in range(count)])
        else:
            vm_dependencies.append(nic_full_name)

        nic_dependencies = []
        if vnet_type == 'new':
            subnet = subnet or '{}Subnet'.format(vm_name)
            vnet_exists = False
            if vnet_name:
                from azure.cli.command_modules.vm._vm_utils import check_existence
                vnet_exists = \
                    check_existence(cmd.cli_ctx, vnet_name, resource_group_name, 'Microsoft.Network', 'virtualNetworks')
                if vnet_exists:
                    SubnetCreate = import_aaz_by_profile(cmd.cli_ctx.cloud.profile, "network.vnet.subnet").Create
                    try:
                        poller = SubnetCreate(cli_ctx=cmd.cli_ctx)(command_args={
                            'name': subnet,
                            'vnet_name': vnet_name,
                            'resource_group': resource_group_name,
                            'address_prefixes': [subnet_address_prefix],
                            'address_prefix': subnet_address_prefix
                        })
                        LongRunningOperation(cmd.cli_ctx)(poller)
                    except Exception:
                        raise CLIError('Subnet({}) does not exist, but failed to create a new subnet with address '
                                       'prefix {}. It may be caused by name or address prefix conflict. Please specify '
                                       'an appropriate subnet name with --subnet or a valid address prefix value with '
                                       '--subnet-address-prefix.'.format(subnet, subnet_address_prefix))
            if not vnet_exists:
                vnet_name = vnet_name or '{}VNET'.format(vm_name)
                nic_dependencies.append('Microsoft.Network/virtualNetworks/{}'.format(vnet_name))
                master_template.add_resource(build_vnet_resource(cmd, vnet_name, location, tags, vnet_address_prefix,
                                                                 subnet, subnet_address_prefix, edge_zone=edge_zone))

        if nsg_type == 'new':
            if nsg_rule is None:
                nsg_rule = 'RDP' if os_type.lower() == 'windows' else 'SSH'
            nsg = nsg or '{}NSG'.format(vm_name)
            nic_dependencies.append('Microsoft.Network/networkSecurityGroups/{}'.format(nsg))
            master_template.add_resource(build_nsg_resource(cmd, nsg, location, tags, nsg_rule))

        if public_ip_address_type == 'new':
            public_ip_address = public_ip_address or '{}PublicIP'.format(vm_name)
            public_ip_address_full_name = 'Microsoft.Network/publicIpAddresses/{}'.format(public_ip_address)
            if count:
                nic_dependencies.extend([public_ip_address_full_name + str(i) for i in range(count)])
            else:
                nic_dependencies.append(public_ip_address_full_name)
            master_template.add_resource(build_public_ip_resource(cmd, public_ip_address, location, tags,
                                                                  public_ip_address_allocation,
                                                                  public_ip_address_dns_name,
                                                                  public_ip_sku, zone, count, edge_zone))

        subnet_id = subnet if is_valid_resource_id(subnet) else \
            '{}/virtualNetworks/{}/subnets/{}'.format(network_id_template, vnet_name, subnet)

        nsg_id = None
        if nsg:
            nsg_id = nsg if is_valid_resource_id(nsg) else \
                '{}/networkSecurityGroups/{}'.format(network_id_template, nsg)

        public_ip_address_id = None
        if public_ip_address:
            public_ip_address_id = public_ip_address if is_valid_resource_id(public_ip_address) \
                else '{}/publicIPAddresses/{}'.format(network_id_template, public_ip_address)

        nics_id = '{}/networkInterfaces/{}'.format(network_id_template, nic_name)

        if count:
            nics = [
                {
                    'id': "[concat('{}', copyIndex())]".format(nics_id),
                    'properties': {
                        'deleteOption': nic_delete_option
                    }
                }
            ]
        else:
            nics = [
                {
                    'id': nics_id,
                    'properties': {
                        'deleteOption': nic_delete_option
                    }
                }
            ]

        nic_resource = build_nic_resource(
            cmd, nic_name, location, tags, vm_name, subnet_id, private_ip_address, nsg_id,
            public_ip_address_id, application_security_groups, accelerated_networking=accelerated_networking,
            count=count, edge_zone=edge_zone)
        nic_resource['dependsOn'] = nic_dependencies
        master_template.add_resource(nic_resource)
    else:
        # Using an existing NIC
        invalid_parameters = [nsg, public_ip_address, subnet, vnet_name, application_security_groups]
        if any(invalid_parameters):
            raise CLIError('When specifying an existing NIC, do not specify NSG, '
                           'public IP, ASGs, VNet or subnet.')
        if accelerated_networking is not None:
            logger.warning('When specifying an existing NIC, do not specify accelerated networking. '
                           'Ignore --accelerated-networking now. '
                           'This will trigger an error instead of a warning in future releases.')

    os_vhd_uri = None
    if storage_profile in [StorageProfile.SACustomImage, StorageProfile.SAPirImage]:
        storage_account_name = storage_account.rsplit('/', 1)
        storage_account_name = storage_account_name[1] if \
            len(storage_account_name) > 1 else storage_account_name[0]
        os_vhd_uri = 'https://{}.blob.{}/{}/{}.vhd'.format(
            storage_account_name, cmd.cli_ctx.cloud.suffixes.storage_endpoint, storage_container_name, os_disk_name)
    elif storage_profile == StorageProfile.SASpecializedOSDisk:
        os_vhd_uri = attach_os_disk
        os_disk_name = attach_os_disk.rsplit('/', 1)[1][:-4]

    if custom_data:
        custom_data = read_content_if_is_file(custom_data)

    if user_data:
        user_data = read_content_if_is_file(user_data)

    if secrets:
        secrets = _merge_secrets([validate_file_or_dict(secret) for secret in secrets])

    vm_resource = build_vm_resource(
        cmd=cmd, name=vm_name, location=location, tags=tags, size=size, storage_profile=storage_profile, nics=nics,
        admin_username=admin_username, availability_set_id=availability_set, admin_password=admin_password,
        ssh_key_values=ssh_key_value, ssh_key_path=ssh_dest_key_path, image_reference=image,
        os_disk_name=os_disk_name, custom_image_os_type=os_type, authentication_type=authentication_type,
        os_publisher=os_publisher, os_offer=os_offer, os_sku=os_sku, os_version=os_version, os_vhd_uri=os_vhd_uri,
        attach_os_disk=attach_os_disk, os_disk_size_gb=os_disk_size_gb, custom_data=custom_data, secrets=secrets,
        license_type=license_type, zone=zone, disk_info=disk_info,
        boot_diagnostics_storage_uri=boot_diagnostics_storage, ultra_ssd_enabled=ultra_ssd_enabled,
        proximity_placement_group=proximity_placement_group, computer_name=computer_name,
        dedicated_host=dedicated_host, priority=priority, max_price=max_price, eviction_policy=eviction_policy,
        enable_agent=enable_agent, vmss=vmss, os_disk_encryption_set=os_disk_encryption_set,
        data_disk_encryption_sets=data_disk_encryption_sets, specialized=specialized,
        encryption_at_host=encryption_at_host, dedicated_host_group=dedicated_host_group,
        enable_auto_update=enable_auto_update, patch_mode=patch_mode, enable_hotpatching=enable_hotpatching,
        platform_fault_domain=platform_fault_domain, security_type=security_type, enable_secure_boot=enable_secure_boot,
        enable_vtpm=enable_vtpm, count=count, edge_zone=edge_zone, os_disk_delete_option=os_disk_delete_option,
        user_data=user_data, capacity_reservation_group=capacity_reservation_group,
        enable_hibernation=enable_hibernation, v_cpus_available=v_cpus_available, v_cpus_per_core=v_cpus_per_core,
        os_disk_security_encryption_type=os_disk_security_encryption_type,
        os_disk_secure_vm_disk_encryption_set=os_disk_secure_vm_disk_encryption_set,
        disk_controller_type=disk_controller_type, enable_proxy_agent=enable_proxy_agent,
        proxy_agent_mode=proxy_agent_mode, additional_scheduled_events=additional_scheduled_events,
        enable_user_reboot_scheduled_events=enable_user_reboot_scheduled_events,
        enable_user_redeploy_scheduled_events=enable_user_redeploy_scheduled_events,
        zone_placement_policy=zone_placement_policy, include_zones=include_zones, exclude_zones=exclude_zones,
        align_regional_disks_to_vm_zone=align_regional_disks_to_vm_zone, wire_server_mode=wire_server_mode,
        imds_mode=imds_mode,
        wire_server_access_control_profile_reference_id=wire_server_access_control_profile_reference_id,
        imds_access_control_profile_reference_id=imds_access_control_profile_reference_id,
        key_incarnation_id=key_incarnation_id)

    vm_resource['dependsOn'] = vm_dependencies

    if plan_name:
        vm_resource['plan'] = {
            'name': plan_name,
            'publisher': plan_publisher,
            'product': plan_product,
            'promotionCode': plan_promotion_code
        }

    enable_local_identity = None
    if assign_identity is not None:
        vm_resource['identity'], _, _, enable_local_identity = _build_identities_info(assign_identity)
        role_assignment_guid = None
        if identity_scope:
            role_assignment_guid = str(_gen_guid())
            master_template.add_resource(build_msi_role_assignment(vm_name, vm_id, identity_role_id,
                                                                   role_assignment_guid, identity_scope))

    if encryption_identity:
        if not cmd.supported_api_version(min_api='2023-09-01', resource_type=ResourceType.MGMT_COMPUTE):
            raise CLIError("Usage error: Encryption Identity required API version 2023-09-01 or higher."
                           "You can set the cloud's profile to use the required API Version with:"
                           "az cloud set --profile latest --name <cloud name>")

        if 'identity' in vm_resource and 'userAssignedIdentities' in vm_resource['identity'] \
            and encryption_identity.lower() in \
                (k.lower() for k in vm_resource['identity']['userAssignedIdentities'].keys()):
            if 'securityProfile' not in vm_resource['properties']:
                vm_resource['properties']['securityProfile'] = {}
            if 'encryptionIdentity' not in vm_resource['properties']['securityProfile']:
                vm_resource['properties']['securityProfile']['encryptionIdentity'] = {}

            vm_securityProfile_EncryptionIdentity = vm_resource['properties']['securityProfile']['encryptionIdentity']

            if 'userAssignedIdentityResourceId' not in vm_securityProfile_EncryptionIdentity or \
                    vm_securityProfile_EncryptionIdentity['userAssignedIdentityResourceId'] != encryption_identity:
                vm_resource['properties']['securityProfile']['encryptionIdentity']['userAssignedIdentityResourceId'] \
                    = encryption_identity
        else:
            raise CLIError("Encryption Identity should be an ARM Resource ID of one of the "
                           "user assigned identities associated to the resource")

    if workspace is not None:
        workspace_id = _prepare_workspace(cmd, resource_group_name, workspace)
        master_template.add_secure_parameter('workspaceId', workspace_id)
        if os_type.lower() == 'linux':
            vm_mmaExtension_resource = build_vm_linux_log_analytics_workspace_agent(cmd, vm_name, location)
            master_template.add_resource(vm_mmaExtension_resource)
        elif os_type.lower() == 'windows':
            vm_mmaExtension_resource = build_vm_windows_log_analytics_workspace_agent(cmd, vm_name, location)
            master_template.add_resource(vm_mmaExtension_resource)
        else:
            logger.warning("Unsupported OS type. Skip the connection step for log analytics workspace.")

    master_template.add_resource(vm_resource)

    if admin_password:
        master_template.add_secure_parameter('adminPassword', admin_password)

    template = master_template.build()
    parameters = master_template.build_parameters()

    # deploy ARM template
    deployment_name = 'vm_deploy_' + random_string(32)
    client = get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_RESOURCE_RESOURCES,
                                     aux_subscriptions=aux_subscriptions).deployments
    DeploymentProperties = cmd.get_models('DeploymentProperties', resource_type=ResourceType.MGMT_RESOURCE_RESOURCES)
    properties = DeploymentProperties(template=template, parameters=parameters, mode='incremental')
    Deployment = cmd.get_models('Deployment', resource_type=ResourceType.MGMT_RESOURCE_RESOURCES)
    deployment = Deployment(properties=properties)

    if validate:
        from azure.cli.command_modules.vm._vm_utils import log_pprint_template
        log_pprint_template(template)
        log_pprint_template(parameters)

        if cmd.supported_api_version(min_api='2019-10-01', resource_type=ResourceType.MGMT_RESOURCE_RESOURCES):
            validation_poller = client.begin_validate(resource_group_name, deployment_name, deployment)
            return LongRunningOperation(cmd.cli_ctx)(validation_poller)

        return client.validate(resource_group_name, deployment_name, deployment)

    # creates the VM deployment
    if no_wait:
        return sdk_no_wait(no_wait, client.begin_create_or_update, resource_group_name, deployment_name, deployment)
    LongRunningOperation(cmd.cli_ctx)(client.begin_create_or_update(resource_group_name, deployment_name, deployment))

    # Guest Attestation Extension and enable System Assigned MSI by default
    is_trusted_launch = security_type and security_type.lower() == 'trustedlaunch' and\
        enable_vtpm and enable_secure_boot
    is_confidential_vm = security_type and security_type.lower() == 'confidentialvm'
    if (is_trusted_launch or is_confidential_vm) and enable_integrity_monitoring:
        vm = get_vm(cmd, resource_group_name, vm_name, 'instanceView')
        client = _compute_client_factory(cmd.cli_ctx)
        if vm.storage_profile.os_disk.os_type == 'Linux':
            publisher = 'Microsoft.Azure.Security.LinuxAttestation'
        if vm.storage_profile.os_disk.os_type == 'Windows':
            publisher = 'Microsoft.Azure.Security.WindowsAttestation'
        version = _normalize_extension_version(cmd.cli_ctx, publisher, 'GuestAttestation', None, vm.location)
        VirtualMachineExtension = cmd.get_models('VirtualMachineExtension')
        ext = VirtualMachineExtension(location=vm.location,
                                      publisher=publisher,
                                      type_properties_type='GuestAttestation',
                                      protected_settings=None,
                                      type_handler_version=version,
                                      settings=None,
                                      auto_upgrade_minor_version=True,
                                      enable_automatic_upgrade=not disable_integrity_monitoring_autoupgrade)
        try:
            LongRunningOperation(cmd.cli_ctx)(client.virtual_machine_extensions.begin_create_or_update(
                resource_group_name, vm_name, 'GuestAttestation', ext))
            logger.info('Guest Attestation Extension has been successfully installed by default '
                        'when Trusted Launch configuration is met')
        except Exception as e:
            error_type = "Trusted Launch" if is_trusted_launch else "Confidential VM"
            logger.error('Failed to install Guest Attestation Extension for %s. %s', error_type, e)
    if count:
        vm_names = [vm_name + str(i) for i in range(count)]
    else:
        vm_names = [vm_name]
    vms = []
    # Use vm_name2 to avoid R1704: Redefining argument with the local name 'vm_name' (redefined-argument-from-local)
    for vm_name2 in vm_names:
        vm = get_vm_details(cmd, resource_group_name, vm_name2)
        if assign_identity is not None:
            if enable_local_identity and not identity_scope:
                _show_missing_access_warning(resource_group_name, vm_name2, 'vm')
            setattr(vm, 'identity', _construct_identity_info(identity_scope, identity_role, vm.identity.principal_id,
                                                             vm.identity.user_assigned_identities))
        vms.append(vm)

    if workspace is not None:
        workspace_name = parse_resource_id(workspace_id)['name']
        _set_data_source_for_workspace(cmd, os_type, resource_group_name, workspace_name)

    if len(vms) == 1:
        return vms[0]
    return vms