def parent_construct_payload()

in src/containerapp/azext_containerapp/containerapp_decorator.py [0:0]


    def parent_construct_payload(self):
        # preview logic
        self.check_create_acrpull_role_assignment()
        # end preview logic

        if self.get_argument_yaml():
            return self.set_up_create_containerapp_yaml(name=self.get_argument_name(), file_name=self.get_argument_yaml())

        if not self.get_argument_image():
            self.set_argument_image(HELLO_WORLD_IMAGE)

        if self.get_argument_managed_env() is None:
            raise RequiredArgumentMissingError('Usage error: --environment is required if not using --yaml')

        # Validate managed environment
        parsed_managed_env = parse_resource_id(self.get_argument_managed_env())
        managed_env_name = parsed_managed_env['name']
        managed_env_rg = parsed_managed_env['resource_group']
        managed_env_info = None

        try:
            managed_env_info = self.get_environment_client().show(cmd=self.cmd, resource_group_name=managed_env_rg, name=managed_env_name)
        except Exception as e:
            handle_non_404_status_code_exception(e)

        if not managed_env_info:
            raise ValidationError("The environment '{}' does not exist. Specify a valid environment".format(self.get_argument_managed_env()))

        while not self.get_argument_no_wait() and safe_get(managed_env_info, "properties", "provisioningState", default="").lower() in ["inprogress", "updating"]:
            logger.info("Waiting for environment provisioning to finish before creating container app")
            time.sleep(5)
            managed_env_info = self.get_environment_client().show(cmd=self.cmd, resource_group_name=managed_env_rg, name=managed_env_name)

        location = managed_env_info["location"]
        _ensure_location_allowed(self.cmd, location, CONTAINER_APPS_RP, "containerApps")

        if not self.get_argument_workload_profile_name() and "workloadProfiles" in managed_env_info:
            workload_profile_name = get_default_workload_profile_name_from_env(self.cmd, managed_env_info, managed_env_rg)
            self.set_argument_workload_profile_name(workload_profile_name)

        external_ingress = None
        if self.get_argument_ingress() is not None:
            if self.get_argument_ingress().lower() == "internal":
                external_ingress = False
            elif self.get_argument_ingress().lower() == "external":
                external_ingress = True

        ingress_def = None
        if self.get_argument_target_port() is not None and self.get_argument_ingress() is not None:
            ingress_def = deepcopy(IngressModel)
            ingress_def["external"] = external_ingress
            ingress_def["targetPort"] = self.get_argument_target_port()
            ingress_def["transport"] = self.get_argument_transport()
            ingress_def["exposedPort"] = self.get_argument_exposed_port() if self.get_argument_transport() == "tcp" else None
            ingress_def["allowInsecure"] = self.get_argument_allow_insecure()

        secrets_def = None
        if self.get_argument_secrets() is not None:
            secrets_def = parse_secret_flags(self.get_argument_secrets())

        registries_def = None
        if self.get_argument_registry_server() is not None and not is_registry_msi_system(self.get_argument_registry_identity()):
            registries_def = deepcopy(RegistryCredentialsModel)
            registries_def["server"] = self.get_argument_registry_server()

            # Infer credentials if not supplied and its azurecr
            if (self.get_argument_registry_user() is None or self.get_argument_registry_pass() is None) and self.get_argument_registry_identity() is None:
                registry_user, registry_pass = _infer_acr_credentials(self.cmd, self.get_argument_registry_server(), self.get_argument_disable_warnings())
                self.set_argument_registry_user(registry_user)
                self.set_argument_registry_pass(registry_pass)

            if not self.get_argument_registry_identity():
                registries_def["username"] = self.get_argument_registry_user()

                if secrets_def is None:
                    secrets_def = []
                registries_def["passwordSecretRef"] = store_as_secret_and_return_secret_ref(secrets_def, self.get_argument_registry_user(),
                                                                                            self.get_argument_registry_server(),
                                                                                            self.get_argument_registry_pass(),
                                                                                            disable_warnings=self.get_argument_disable_warnings())
            else:
                registries_def["identity"] = self.get_argument_registry_identity()

        dapr_def = None
        if self.get_argument_dapr_enabled():
            dapr_def = deepcopy(DaprModel)
            dapr_def["enabled"] = True
            dapr_def["appId"] = self.get_argument_dapr_app_id()
            dapr_def["appPort"] = self.get_argument_dapr_app_port()
            dapr_def["appProtocol"] = self.get_argument_dapr_app_protocol()
            dapr_def["httpReadBufferSize"] = self.get_argument_dapr_http_read_buffer_size()
            dapr_def["httpMaxRequestSize"] = self.get_argument_dapr_http_max_request_size()
            dapr_def["logLevel"] = self.get_argument_dapr_log_level()
            dapr_def["enableApiLogging"] = self.get_argument_dapr_enable_api_logging()

        config_def = deepcopy(ConfigurationModel)
        config_def["secrets"] = secrets_def
        config_def["activeRevisionsMode"] = self.get_argument_revisions_mode()
        config_def["targetLabel"] = self.get_argument_target_label()
        config_def["ingress"] = ingress_def
        config_def["registries"] = [registries_def] if registries_def is not None else None
        config_def["dapr"] = dapr_def

        # Identity actions
        identity_def = deepcopy(ManagedServiceIdentityModel)
        identity_def["type"] = "None"

        assign_system_identity = self.get_argument_system_assigned()
        if self.get_argument_user_assigned():
            assign_user_identities = [x.lower() for x in self.get_argument_user_assigned()]
        else:
            assign_user_identities = []

        if assign_system_identity and assign_user_identities:
            identity_def["type"] = "SystemAssigned, UserAssigned"
        elif assign_system_identity:
            identity_def["type"] = "SystemAssigned"
        elif assign_user_identities:
            identity_def["type"] = "UserAssigned"

        if assign_user_identities:
            identity_def["userAssignedIdentities"] = {}
            subscription_id = get_subscription_id(self.cmd.cli_ctx)

            for r in assign_user_identities:
                r = _ensure_identity_resource_id(subscription_id, self.get_argument_resource_group_name(), r)
                identity_def["userAssignedIdentities"][r] = {}  # pylint: disable=unsupported-assignment-operation

        scale_def = self.set_up_scale_rule()

        resources_def = None
        if self.get_argument_cpu() is not None or self.get_argument_memory() is not None:
            resources_def = deepcopy(ContainerResourcesModel)
            resources_def["cpu"] = self.get_argument_cpu()
            resources_def["memory"] = self.get_argument_memory()

        container_def = deepcopy(ContainerModel)
        container_def["name"] = self.get_argument_container_name() if self.get_argument_container_name() else self.get_argument_name()
        container_def["image"] = self.get_argument_image() if not is_registry_msi_system(self.get_argument_registry_identity()) else HELLO_WORLD_IMAGE
        if self.get_argument_env_vars() is not None:
            container_def["env"] = parse_env_var_flags(self.get_argument_env_vars())
        if self.get_argument_startup_command() is not None:
            container_def["command"] = self.get_argument_startup_command()
        if self.get_argument_args() is not None:
            container_def["args"] = self.get_argument_args()
        if resources_def is not None:
            container_def["resources"] = resources_def

        template_def = deepcopy(TemplateModel)

        template_def["containers"] = [container_def]
        template_def["scale"] = scale_def

        if self.get_argument_secret_volume_mount() is not None:
            volume_def = deepcopy(VolumeModel)
            volume_mount_def = deepcopy(VolumeMountModel)
            # generate a volume name
            volume_def["name"] = _generate_secret_volume_name()
            volume_def["storageType"] = "Secret"

            # mount the volume to the container
            volume_mount_def["volumeName"] = volume_def["name"]
            volume_mount_def["mountPath"] = self.get_argument_secret_volume_mount()
            container_def["volumeMounts"] = [volume_mount_def]
            template_def["volumes"] = [volume_def]

        if self.get_argument_revision_suffix() is not None and not is_registry_msi_system(self.get_argument_registry_identity()):
            template_def["revisionSuffix"] = self.get_argument_revision_suffix()

        if self.get_argument_termination_grace_period() is not None:
            template_def["terminationGracePeriodSeconds"] = self.get_argument_termination_grace_period()

        self.containerapp_def["location"] = location
        self.containerapp_def["identity"] = identity_def
        self.containerapp_def["properties"]["environmentId"] = self.get_argument_managed_env()
        self.containerapp_def["properties"]["configuration"] = config_def
        self.containerapp_def["properties"]["template"] = template_def
        self.containerapp_def["tags"] = self.get_argument_tags()

        if self.get_argument_workload_profile_name():
            self.containerapp_def["properties"]["workloadProfileName"] = self.get_argument_workload_profile_name()
            ensure_workload_profile_supported(self.cmd, managed_env_name, managed_env_rg, self.get_argument_workload_profile_name(),
                                              managed_env_info)

        # preview logic
        self.set_up_registry_identity()