in python/az/aro/azext_aro/custom.py [0:0]
def aro_create(cmd, # pylint: disable=too-many-locals
client,
resource_group_name,
resource_name,
master_subnet,
worker_subnet,
vnet=None, # pylint: disable=unused-argument
vnet_resource_group_name=None, # pylint: disable=unused-argument
enable_preconfigured_nsg=None,
location=None,
pull_secret=None,
domain=None,
cluster_resource_group=None,
fips_validated_modules=None,
client_id=None,
client_secret=None,
pod_cidr=None,
service_cidr=None,
outbound_type=None,
disk_encryption_set=None,
master_encryption_at_host=False,
master_vm_size=None,
worker_encryption_at_host=False,
worker_vm_size=None,
worker_vm_disk_size_gb=None,
worker_count=None,
apiserver_visibility=None,
ingress_visibility=None,
load_balancer_managed_outbound_ip_count=None,
enable_managed_identity=False,
platform_workload_identities=None,
mi_user_assigned=None,
tags=None,
version=None,
no_wait=False):
if not rp_mode_development():
resource_client = get_mgmt_service_client(
cmd.cli_ctx, ResourceType.MGMT_RESOURCE_RESOURCES)
provider = resource_client.providers.get('Microsoft.RedHatOpenShift')
if provider.registration_state != 'Registered':
raise UnauthorizedError('Microsoft.RedHatOpenShift provider is not registered.',
'Run `az provider register -n Microsoft.RedHatOpenShift --wait`.')
validate_subnets(master_subnet, worker_subnet)
validate(cmd,
client,
resource_group_name,
resource_name,
master_subnet,
worker_subnet,
vnet=vnet,
enable_preconfigured_nsg=enable_preconfigured_nsg,
cluster_resource_group=cluster_resource_group,
client_id=client_id,
client_secret=client_secret,
vnet_resource_group_name=vnet_resource_group_name,
disk_encryption_set=disk_encryption_set,
location=location,
version=version,
pod_cidr=pod_cidr,
service_cidr=service_cidr,
warnings_as_text=True)
subscription_id = get_subscription_id(cmd.cli_ctx)
random_id = generate_random_id()
aad = AADManager(cmd.cli_ctx)
if not enable_managed_identity:
if client_id is None:
client_id, client_secret = aad.create_application(cluster_resource_group or 'aro-' + random_id)
client_sp_id = aad.get_service_principal_id(client_id)
if not client_sp_id:
client_sp_id = aad.create_service_principal(client_id)
rp_client_sp_id = aad.get_service_principal_id(resolve_rp_client_id())
if not rp_client_sp_id:
raise ResourceNotFoundError("RP service principal not found.")
if rp_mode_development():
worker_vm_size = worker_vm_size or 'Standard_D2s_v5'
else:
worker_vm_size = worker_vm_size or 'Standard_D4s_v5'
if apiserver_visibility is not None:
apiserver_visibility = apiserver_visibility.capitalize()
if ingress_visibility is not None:
ingress_visibility = ingress_visibility.capitalize()
load_balancer_profile = None
if load_balancer_managed_outbound_ip_count is not None:
load_balancer_profile = openshiftcluster.LoadBalancerProfile()
load_balancer_profile.managed_outbound_ips = openshiftcluster.ManagedOutboundIPs()
load_balancer_profile.managed_outbound_ips.count = load_balancer_managed_outbound_ip_count # pylint: disable=line-too-long
oc = openshiftcluster.OpenShiftCluster(
location=location,
tags=tags,
cluster_profile=openshiftcluster.ClusterProfile(
pull_secret=pull_secret or "",
domain=domain or random_id,
resource_group_id=(f"/subscriptions/{subscription_id}"
f"/resourceGroups/{cluster_resource_group or 'aro-' + random_id}"),
fips_validated_modules='Enabled' if fips_validated_modules else 'Disabled',
version=version or '',
),
network_profile=openshiftcluster.NetworkProfile(
pod_cidr=pod_cidr or '10.128.0.0/14',
service_cidr=service_cidr or '172.30.0.0/16',
outbound_type=outbound_type or '',
load_balancer_profile=load_balancer_profile,
preconfigured_nsg='Enabled' if enable_preconfigured_nsg else 'Disabled',
),
master_profile=openshiftcluster.MasterProfile(
vm_size=master_vm_size or 'Standard_D8s_v5',
subnet_id=master_subnet,
encryption_at_host='Enabled' if master_encryption_at_host else 'Disabled',
disk_encryption_set_id=disk_encryption_set,
),
worker_profiles=[
openshiftcluster.WorkerProfile(
name='worker', # TODO: 'worker' should not be hard-coded
vm_size=worker_vm_size,
disk_size_gb=worker_vm_disk_size_gb or 128,
subnet_id=worker_subnet,
count=worker_count or 3,
encryption_at_host='Enabled' if worker_encryption_at_host else 'Disabled',
disk_encryption_set_id=disk_encryption_set,
)
],
apiserver_profile=openshiftcluster.APIServerProfile(
visibility=apiserver_visibility or 'Public',
),
ingress_profiles=[
openshiftcluster.IngressProfile(
name='default', # TODO: 'default' should not be hard-coded
visibility=ingress_visibility or 'Public',
)
],
service_principal_profile=None,
platform_workload_identity_profile=None,
)
if enable_managed_identity is True:
oc.platform_workload_identity_profile = openshiftcluster.PlatformWorkloadIdentityProfile(
platform_workload_identities=dict(platform_workload_identities)
)
oc.identity = openshiftcluster.ManagedServiceIdentity(
type='UserAssigned',
user_assigned_identities={mi_user_assigned: {}}
)
# TODO - perform client-side validation of required identity permissions
else:
oc.service_principal_profile = openshiftcluster.ServicePrincipalProfile(
client_id=client_id,
client_secret=client_secret,
)
sp_obj_ids = [client_sp_id, rp_client_sp_id]
ensure_resource_permissions(cmd.cli_ctx, oc, True, sp_obj_ids)
return sdk_no_wait(no_wait, client.open_shift_clusters.begin_create_or_update,
resource_group_name=resource_group_name,
resource_name=resource_name,
parameters=oc)