in src/azure-cli/azure/cli/command_modules/containerapp/containerapp_decorator.py [0:0]
def construct_payload(self):
if self.get_argument_registry_identity() and not is_registry_msi_system(self.get_argument_registry_identity()):
logger.info("Creating an acrpull role assignment for the registry identity")
create_acrpull_role_assignment(self.cmd, self.get_argument_registry_server(), self.get_argument_registry_identity(), skip_error=True)
if self.get_argument_yaml():
return self.set_up_create_containerapp_yaml(name=self.get_argument_name(), file_name=self.get_argument_yaml())
if not self.get_argument_image():
self.set_argument_image(HELLO_WORLD_IMAGE)
if self.get_argument_managed_env() is None:
raise RequiredArgumentMissingError('Usage error: --environment is required if not using --yaml')
# Validate managed environment
parsed_managed_env = parse_resource_id(self.get_argument_managed_env())
managed_env_name = parsed_managed_env['name']
managed_env_rg = parsed_managed_env['resource_group']
managed_env_info = None
try:
managed_env_info = self.get_environment_client().show(cmd=self.cmd, resource_group_name=managed_env_rg, name=managed_env_name)
except Exception as e:
handle_non_404_status_code_exception(e)
if not managed_env_info:
raise ValidationError("The environment '{}' does not exist. Specify a valid environment".format(self.get_argument_managed_env()))
while not self.get_argument_no_wait() and safe_get(managed_env_info, "properties", "provisioningState", default="").lower() in ["inprogress", "updating"]:
logger.info("Waiting for environment provisioning to finish before creating container app")
time.sleep(5)
managed_env_info = self.get_environment_client().show(cmd=self.cmd, resource_group_name=managed_env_rg, name=managed_env_name)
location = managed_env_info["location"]
_ensure_location_allowed(self.cmd, location, CONTAINER_APPS_RP, "containerApps")
if not self.get_argument_workload_profile_name() and "workloadProfiles" in managed_env_info:
workload_profile_name = get_default_workload_profile_name_from_env(self.cmd, managed_env_info, managed_env_rg)
self.set_argument_workload_profile_name(workload_profile_name)
external_ingress = None
if self.get_argument_ingress() is not None:
if self.get_argument_ingress().lower() == "internal":
external_ingress = False
elif self.get_argument_ingress().lower() == "external":
external_ingress = True
ingress_def = None
if self.get_argument_ingress() is not None:
ingress_def = deepcopy(IngressModel)
ingress_def["external"] = external_ingress
ingress_def["transport"] = self.get_argument_transport()
ingress_def["exposedPort"] = self.get_argument_exposed_port() if self.get_argument_transport() == "tcp" else None
ingress_def["allowInsecure"] = self.get_argument_allow_insecure()
if self.get_argument_target_port() is not None:
ingress_def["targetPort"] = self.get_argument_target_port()
secrets_def = None
if self.get_argument_secrets() is not None:
secrets_def = parse_secret_flags(self.get_argument_secrets())
registries_def = None
if self.get_argument_registry_server() is not None and not is_registry_msi_system(self.get_argument_registry_identity()):
registries_def = deepcopy(RegistryCredentialsModel)
registries_def["server"] = self.get_argument_registry_server()
# Infer credentials if not supplied and its azurecr
if (self.get_argument_registry_user() is None or self.get_argument_registry_pass() is None) and self.get_argument_registry_identity() is None:
registry_user, registry_pass = _infer_acr_credentials(self.cmd, self.get_argument_registry_server(), self.get_argument_disable_warnings())
self.set_argument_registry_user(registry_user)
self.set_argument_registry_pass(registry_pass)
if not self.get_argument_registry_identity():
registries_def["username"] = self.get_argument_registry_user()
if secrets_def is None:
secrets_def = []
registries_def["passwordSecretRef"] = store_as_secret_and_return_secret_ref(secrets_def, self.get_argument_registry_user(),
self.get_argument_registry_server(),
self.get_argument_registry_pass(),
disable_warnings=self.get_argument_disable_warnings())
else:
registries_def["identity"] = self.get_argument_registry_identity()
dapr_def = None
if self.get_argument_dapr_enabled():
dapr_def = deepcopy(DaprModel)
dapr_def["enabled"] = True
dapr_def["appId"] = self.get_argument_dapr_app_id()
dapr_def["appPort"] = self.get_argument_dapr_app_port()
dapr_def["appProtocol"] = self.get_argument_dapr_app_protocol()
dapr_def["httpReadBufferSize"] = self.get_argument_dapr_http_read_buffer_size()
dapr_def["httpMaxRequestSize"] = self.get_argument_dapr_http_max_request_size()
dapr_def["logLevel"] = self.get_argument_dapr_log_level()
dapr_def["enableApiLogging"] = self.get_argument_dapr_enable_api_logging()
config_def = deepcopy(ConfigurationModel)
config_def["secrets"] = secrets_def
config_def["activeRevisionsMode"] = self.get_argument_revisions_mode()
config_def["ingress"] = ingress_def
config_def["registries"] = [registries_def] if registries_def is not None else None
config_def["dapr"] = dapr_def
# Identity actions
identity_def = deepcopy(ManagedServiceIdentityModel)
identity_def["type"] = "None"
assign_system_identity = self.get_argument_system_assigned()
if self.get_argument_user_assigned():
assign_user_identities = [x.lower() for x in self.get_argument_user_assigned()]
else:
assign_user_identities = []
if assign_system_identity and assign_user_identities:
identity_def["type"] = "SystemAssigned, UserAssigned"
elif assign_system_identity:
identity_def["type"] = "SystemAssigned"
elif assign_user_identities:
identity_def["type"] = "UserAssigned"
if assign_user_identities:
identity_def["userAssignedIdentities"] = {}
subscription_id = get_subscription_id(self.cmd.cli_ctx)
for r in assign_user_identities:
r = _ensure_identity_resource_id(subscription_id, self.get_argument_resource_group_name(), r)
identity_def["userAssignedIdentities"][r] = {} # pylint: disable=unsupported-assignment-operation
scale_def = self.set_up_scale_rule()
resources_def = None
if self.get_argument_cpu() is not None or self.get_argument_memory() is not None:
resources_def = deepcopy(ContainerResourcesModel)
resources_def["cpu"] = self.get_argument_cpu()
resources_def["memory"] = self.get_argument_memory()
container_def = deepcopy(ContainerModel)
container_def["name"] = self.get_argument_container_name() if self.get_argument_container_name() else self.get_argument_name()
container_def["image"] = self.get_argument_image() if not is_registry_msi_system(self.get_argument_registry_identity()) else HELLO_WORLD_IMAGE
if self.get_argument_env_vars() is not None:
container_def["env"] = parse_env_var_flags(self.get_argument_env_vars())
if self.get_argument_startup_command() is not None:
container_def["command"] = self.get_argument_startup_command()
if self.get_argument_args() is not None:
container_def["args"] = self.get_argument_args()
if resources_def is not None:
container_def["resources"] = resources_def
template_def = deepcopy(TemplateModel)
template_def["containers"] = [container_def]
template_def["scale"] = scale_def
if self.get_argument_secret_volume_mount() is not None:
volume_def = deepcopy(VolumeModel)
volume_mount_def = deepcopy(VolumeMountModel)
# generate a volume name
volume_def["name"] = _generate_secret_volume_name()
volume_def["storageType"] = "Secret"
# mount the volume to the container
volume_mount_def["volumeName"] = volume_def["name"]
volume_mount_def["mountPath"] = self.get_argument_secret_volume_mount()
container_def["volumeMounts"] = [volume_mount_def]
template_def["volumes"] = [volume_def]
if self.get_argument_revision_suffix() is not None and not is_registry_msi_system(self.get_argument_registry_identity()):
template_def["revisionSuffix"] = self.get_argument_revision_suffix()
if self.get_argument_termination_grace_period() is not None:
template_def["terminationGracePeriodSeconds"] = self.get_argument_termination_grace_period()
self.containerapp_def["location"] = location
self.containerapp_def["identity"] = identity_def
self.containerapp_def["properties"]["environmentId"] = self.get_argument_managed_env()
self.containerapp_def["properties"]["configuration"] = config_def
self.containerapp_def["properties"]["template"] = template_def
self.containerapp_def["tags"] = self.get_argument_tags()
if self.get_argument_workload_profile_name():
self.containerapp_def["properties"]["workloadProfileName"] = self.get_argument_workload_profile_name()
ensure_workload_profile_supported(self.cmd, managed_env_name, managed_env_rg, self.get_argument_workload_profile_name(),
managed_env_info)
if self.get_argument_registry_identity():
if is_registry_msi_system(self.get_argument_registry_identity()):
set_managed_identity(self.cmd, self.get_argument_resource_group_name(), self.containerapp_def, system_assigned=True)
else:
set_managed_identity(self.cmd, self.get_argument_resource_group_name(), self.containerapp_def, user_assigned=[self.get_argument_registry_identity()])