def create_cluster()

in src/azure-cli/azure/cli/command_modules/hdinsight/custom.py [0:0]


def create_cluster(cmd, client, cluster_name, resource_group_name, cluster_type,
                   location=None, tags=None, no_wait=False, cluster_version='default', cluster_tier=None,
                   cluster_configurations=None, component_version=None,
                   headnode_size=None, workernode_size=None, zookeepernode_size=None, edgenode_size=None,
                   kafka_management_node_size=None, kafka_management_node_count=2,
                   kafka_client_group_id=None, kafka_client_group_name=None,
                   workernode_count=3, workernode_data_disks_per_node=None,
                   workernode_data_disk_storage_account_type=None, workernode_data_disk_size=None,
                   http_username=None, http_password=None,
                   ssh_username='sshuser', ssh_password=None, ssh_public_key=None,
                   storage_account=None, storage_account_key=None,
                   storage_default_container=None, storage_default_filesystem=None,
                   storage_account_managed_identity=None,
                   vnet_name=None, subnet=None,
                   domain=None, ldaps_urls=None,
                   cluster_admin_account=None, cluster_admin_password=None,
                   cluster_users_group_dns=None,
                   assign_identity=None,
                   minimal_tls_version=None,
                   encryption_vault_uri=None, encryption_key_name=None, encryption_key_version=None,
                   encryption_algorithm='RSA-OAEP', encryption_in_transit=None,
                   autoscale_type=None, autoscale_min_workernode_count=None, autoscale_max_workernode_count=None,
                   timezone=None, days=None, time=None, autoscale_workernode_count=None,
                   encryption_at_host=None, esp=False, idbroker=False,
                   resource_provider_connection=None, enable_private_link=None,
                   public_ip_tag_type=None, public_ip_tag_value=None,
                   enable_compute_isolation=None, host_sku=None, zones=None, private_link_configurations=None,
                   no_validation_timeout=False, outbound_dependencies_managed_type=None):
    from .util import build_identities_info, build_virtual_network_profile, parse_domain_name, \
        get_storage_account_endpoint, validate_esp_cluster_create_params, set_vm_size
    from azure.mgmt.hdinsight.models import ClusterCreateParametersExtended, ClusterCreateProperties, OSType, \
        ClusterDefinition, ComputeProfile, HardwareProfile, Role, OsProfile, LinuxOperatingSystemProfile, \
        StorageProfile, StorageAccount, DataDisksGroups, SecurityProfile, \
        DiskEncryptionProperties, Tier, SshProfile, SshPublicKey, \
        KafkaRestProperties, ClientGroupInfo, EncryptionInTransitProperties, \
        Autoscale, AutoscaleCapacity, AutoscaleRecurrence, AutoscaleSchedule, AutoscaleTimeAndCapacity, \
        NetworkProperties, IpTag, PrivateLink, ComputeIsolationProperties

    validate_esp_cluster_create_params(esp, cluster_name, resource_group_name, cluster_type,
                                       subnet, domain, cluster_admin_account, assign_identity,
                                       ldaps_urls, cluster_admin_password, cluster_users_group_dns)

    if esp:
        if cluster_tier == Tier.standard:
            raise CLIError('Cluster tier cannot be {} when --esp is specified. '
                           'Please use default value or specify {} explicitly.'.format(Tier.standard, Tier.premium))
        if not cluster_tier:
            cluster_tier = Tier.premium

    # Update optional parameters with defaults
    location = location or _get_rg_location(cmd.cli_ctx, resource_group_name)

    # Format dictionary/free-form arguments
    if not cluster_configurations:
        cluster_configurations = {}

    if component_version:
        # See validator
        # pylint: disable=consider-using-dict-comprehension
        component_version = dict([version.split('=') for version in component_version])

    # Validate whether HTTP credentials were provided
    if 'gateway' in cluster_configurations:
        gateway_config = cluster_configurations['gateway']
    else:
        gateway_config = {}
    if http_username and 'restAuthCredential.username' in gateway_config:
        raise CLIError('An HTTP username must be specified either as a command-line parameter '
                       'or in the cluster configuration, but not both.')
    if not http_username:
        http_username = 'admin'  # Implement default logic here, in case a user specifies the username in configurations

    if not http_password:
        try:
            http_password = prompt_pass('HTTP password for the cluster:', confirm=True)
        except NoTTYException:
            raise CLIError('Please specify --http-password in non-interactive mode.')

    # Update the cluster config with the HTTP credentials
    gateway_config['restAuthCredential.isEnabled'] = 'true'  # HTTP credentials are required
    http_username = http_username or gateway_config['restAuthCredential.username']
    gateway_config['restAuthCredential.username'] = http_username
    gateway_config['restAuthCredential.password'] = http_password
    cluster_configurations['gateway'] = gateway_config

    # Validate whether SSH credentials were provided
    if not (ssh_password or ssh_public_key):
        logger.warning("SSH credentials not specified. Using the HTTP password as the SSH password.")
        ssh_password = http_password

    # Validate storage arguments from the user
    if storage_default_container and storage_default_filesystem:
        raise CLIError('Either the default container or the default filesystem can be specified, but not both.')

    # Retrieve primary blob service endpoint
    is_wasb = not storage_account_managed_identity
    storage_account_endpoint = None
    if storage_account:
        storage_account_endpoint = get_storage_account_endpoint(cmd, storage_account, is_wasb)

    # Attempt to infer the storage account key from the endpoint
    if not storage_account_key and storage_account and is_wasb:
        from .util import get_key_for_storage_account
        logger.info('Storage account key not specified. Attempting to retrieve key...')
        key = get_key_for_storage_account(cmd, storage_account)
        if not key:
            raise CLIError('Storage account key could not be inferred from storage account.')
        storage_account_key = key

    # Attempt to provide a default container for WASB storage accounts
    if not storage_default_container and storage_account and is_wasb:
        storage_default_container = cluster_name.lower()
        logger.warning('Default WASB container not specified, using "%s".', storage_default_container)
    elif not storage_default_filesystem and not is_wasb:
        storage_default_filesystem = cluster_name.lower()
        logger.warning('Default ADLS file system not specified, using "%s".', storage_default_filesystem)

    # Validate storage info parameters
    if is_wasb and not _all_or_none(storage_account, storage_account_key, storage_default_container):
        raise CLIError('If storage details are specified, the storage account, storage account key, '
                       'and the default container must be specified.')
    if not is_wasb and not _all_or_none(storage_account, storage_default_filesystem):
        raise CLIError('If storage details are specified, the storage account, '
                       'and the default filesystem must be specified.')

    # Validate disk encryption parameters
    if not _all_or_none(encryption_vault_uri, encryption_key_name, encryption_key_version):
        raise CLIError('Either the encryption vault URI, key name and key version should be specified, '
                       'or none of them should be.')

    # Validate kafka rest proxy parameters
    if not _all_or_none(kafka_client_group_id, kafka_client_group_name):
        raise CLIError('Either the kafka client group id and kafka client group name should be specified, '
                       'or none of them should be')

    # Validate and initialize autoscale setting
    autoscale_configuration = None
    load_based_type = "Load"
    schedule_based_type = "Schedule"
    if autoscale_type and autoscale_type.lower() == load_based_type.lower():
        if not all([autoscale_min_workernode_count, autoscale_max_workernode_count]):
            raise CLIError(
                'When the --autoscale-type is Load, '
                'both --autoscale-min-workernode-count and --autoscale-max-workernode-count should be specified.')

        autoscale_configuration = Autoscale(
            capacity=AutoscaleCapacity(
                min_instance_count=autoscale_min_workernode_count,
                max_instance_count=autoscale_max_workernode_count
            )
        )
    elif autoscale_type and autoscale_type.lower() == schedule_based_type.lower():
        if not all([timezone, days, time, autoscale_workernode_count]):
            raise CLIError(
                'When the --autoscale-type is Schedule, all of the --timezone, --days, --time, '
                '--autoscale-workernode-count should be specified.')

        autoscale_configuration = Autoscale(
            recurrence=AutoscaleRecurrence(
                time_zone=timezone,
                schedule=[AutoscaleSchedule(
                    days=days,
                    time_and_capacity=AutoscaleTimeAndCapacity(
                        time=time,
                        min_instance_count=autoscale_workernode_count,
                        max_instance_count=autoscale_workernode_count
                    )
                )]
            )
        )

    # Specify virtual network profile only when network arguments are provided
    virtual_network_profile = subnet and build_virtual_network_profile(subnet)

    # Validate data disk parameters
    if not workernode_data_disks_per_node and workernode_data_disk_storage_account_type:
        raise CLIError("Cannot define data disk storage account type unless disks per node is defined.")
    if not workernode_data_disks_per_node and workernode_data_disk_size:
        raise CLIError("Cannot define data disk size unless disks per node is defined.")
    # Specify data disk groups only when disk arguments are provided
    workernode_data_disk_groups = workernode_data_disks_per_node and [
        DataDisksGroups(
            disks_per_node=workernode_data_disks_per_node,
            storage_account_type=workernode_data_disk_storage_account_type,
            disk_size_gb=workernode_data_disk_size
        )
    ]

    # call get default vm size api to set vm size if customer does not provide the value
    if not (workernode_size and headnode_size):
        headnode_size, workernode_size = set_vm_size(cmd.cli_ctx, location, cluster_type, headnode_size,
                                                     workernode_size)

    if not headnode_size:
        raise RequiredArgumentMissingError('Please specify --headnode-size explicitly.')
    if not workernode_size:
        raise RequiredArgumentMissingError('Please specify --workernode-size explicitly.')

    os_profile = OsProfile(
        linux_operating_system_profile=LinuxOperatingSystemProfile(
            username=ssh_username,
            password=ssh_password,
            ssh_profile=ssh_public_key and SshProfile(
                public_keys=[SshPublicKey(
                    certificate_data=ssh_public_key
                )]
            )
        )
    )

    roles = [
        # Required roles
        Role(
            name="headnode",
            target_instance_count=2,
            hardware_profile=HardwareProfile(vm_size=headnode_size),
            os_profile=os_profile,
            virtual_network_profile=virtual_network_profile
        ),
        Role(
            name="workernode",
            target_instance_count=workernode_count,
            hardware_profile=HardwareProfile(vm_size=workernode_size),
            os_profile=os_profile,
            virtual_network_profile=virtual_network_profile,
            data_disks_groups=workernode_data_disk_groups,
            autoscale_configuration=autoscale_configuration
        )
    ]

    if zookeepernode_size:
        roles.append(
            Role(
                name="zookeepernode",
                target_instance_count=3,
                hardware_profile=HardwareProfile(vm_size=zookeepernode_size),
                os_profile=os_profile,
                virtual_network_profile=virtual_network_profile
            ))
    if edgenode_size:
        roles.append(
            Role(
                name="edgenode",
                target_instance_count=1,
                hardware_profile=HardwareProfile(vm_size=edgenode_size),
                os_profile=os_profile,
                virtual_network_profile=virtual_network_profile
            ))
    if kafka_management_node_size:
        # generate kafkaRestProperties
        roles.append(
            Role(
                name="kafkamanagementnode",
                target_instance_count=kafka_management_node_count,
                hardware_profile=HardwareProfile(vm_size=kafka_management_node_size),
                os_profile=os_profile,
                virtual_network_profile=virtual_network_profile
            )
        )

    if esp and idbroker:
        roles.append(
            Role(
                name="idbrokernode",
                target_instance_count=2,
                virtual_network_profile=virtual_network_profile
            )
        )

    storage_accounts = []
    if storage_account:
        # Specify storage account details only when storage arguments are provided
        storage_accounts.append(
            StorageAccount(
                name=storage_account_endpoint,
                key=storage_account_key,
                container=storage_default_container,
                file_system=storage_default_filesystem,
                resource_id=storage_account,
                msi_resource_id=storage_account_managed_identity,
                is_default=True
            )
        )

    additional_storage_accounts = []  # TODO: Add support for additional storage accounts
    if additional_storage_accounts:
        storage_accounts += [
            StorageAccount(
                name=s.storage_account_endpoint,
                key=s.storage_account_key,
                container=s.container,
                is_default=False
            )
            for s in additional_storage_accounts
        ]

    assign_identities = []
    if assign_identity:
        assign_identities.append(assign_identity)

    if storage_account_managed_identity:
        assign_identities.append(storage_account_managed_identity)

    cluster_identity = build_identities_info(assign_identities) if assign_identities else None

    domain_name = domain and parse_domain_name(domain)
    if not ldaps_urls and domain_name:
        ldaps_urls = ['ldaps://{}:636'.format(domain_name)]

    security_profile = domain and SecurityProfile(
        directory_type=SecurityProfile.directory_type,
        domain=domain_name,
        ldaps_urls=ldaps_urls,
        domain_username=cluster_admin_account,
        domain_user_password=cluster_admin_password,
        cluster_users_group_dns=cluster_users_group_dns,
        aadds_resource_id=domain,
        msi_resource_id=assign_identity
    )

    disk_encryption_properties = encryption_vault_uri and DiskEncryptionProperties(
        vault_uri=encryption_vault_uri,
        key_name=encryption_key_name,
        key_version=encryption_key_version,
        encryption_algorithm=encryption_algorithm,
        msi_resource_id=assign_identity
    )

    if encryption_at_host:
        if disk_encryption_properties:
            disk_encryption_properties.encryption_at_host = encryption_at_host
        else:
            disk_encryption_properties = DiskEncryptionProperties(encryption_at_host=encryption_at_host)

    kafka_rest_properties = KafkaRestProperties(
        client_group_info=ClientGroupInfo(
            group_id=kafka_client_group_id,
            group_name=kafka_client_group_name
        )
    ) if (kafka_client_group_id and kafka_client_group_name) else None

    encryption_in_transit_properties = EncryptionInTransitProperties(
        is_encryption_in_transit_enabled=encryption_in_transit
    ) if encryption_in_transit is True else None

    # relay outbound and private link
    network_properties = NetworkProperties(
        resource_provider_connection=resource_provider_connection,
        private_link=PrivateLink.enabled if enable_private_link is True else PrivateLink.disabled
    ) if (resource_provider_connection is not None or enable_private_link is not None) else None
    if outbound_dependencies_managed_type:
        network_properties.outbound_dependencies_managed_type = outbound_dependencies_managed_type
    if public_ip_tag_type and public_ip_tag_value:
        network_properties.public_ip_tag = IpTag(
            ip_tag_type=public_ip_tag_type,
            tag=public_ip_tag_value
        )

    # compute isolation
    compute_isolation_properties = ComputeIsolationProperties(
        enable_compute_isolation=enable_compute_isolation,
        host_sku=host_sku
    ) if enable_compute_isolation is True else None

    create_params = ClusterCreateParametersExtended(
        location=location,
        tags=tags,
        zones=zones,
        properties=ClusterCreateProperties(
            cluster_version=cluster_version,
            os_type=OSType.linux,
            tier=cluster_tier,
            cluster_definition=ClusterDefinition(
                kind=cluster_type,
                configurations=cluster_configurations,
                component_version=component_version
            ),
            compute_profile=ComputeProfile(
                roles=roles
            ),
            storage_profile=StorageProfile(
                storageaccounts=storage_accounts
            ),
            security_profile=security_profile,
            disk_encryption_properties=disk_encryption_properties,
            kafka_rest_properties=kafka_rest_properties,
            min_supported_tls_version=minimal_tls_version,
            encryption_in_transit_properties=encryption_in_transit_properties,
            network_properties=network_properties,
            compute_isolation_properties=compute_isolation_properties,
            private_link_configurations=private_link_configurations
        ),
        identity=cluster_identity
    )

    return sdk_no_wait(no_wait, client.begin_create, resource_group_name, cluster_name, create_params)