def construct_payload()

in src/containerapp/azext_containerapp/containerapp_decorator.py [0:0]


    def construct_payload(self):
        # construct from yaml
        if self.get_argument_yaml():
            return self.set_up_update_containerapp_yaml(name=self.get_argument_name(), file_name=self.get_argument_yaml())

        self.new_containerapp["properties"] = {}

        self.set_up_from_revision()

        # Doing this while API has bug. If env var is an empty string, API doesn't return "value" even though the "value" should be an empty string
        for container in safe_get(self.containerapp_def, "properties", "template", "containers", default=[]):
            if "env" in container:
                for e in container["env"]:
                    if "value" not in e:
                        e["value"] = ""

        update_map = {}
        update_map['scale'] = self.get_argument_min_replicas() is not None or self.get_argument_max_replicas() is not None or self.get_argument_scale_rule_name()
        update_map['container'] = self._need_update_container()
        update_map['ingress'] = self.get_argument_ingress() or self.get_argument_target_port()
        update_map['registry'] = self.get_argument_registry_server() or self.get_argument_registry_user() or self.get_argument_registry_pass() or self.get_argument_registry_identity()

        if self.get_argument_tags():
            _add_or_update_tags(self.new_containerapp, self.get_argument_tags())

        if self.get_argument_revision_suffix() is not None:
            self.new_containerapp["properties"]["template"] = {} if "template" not in self.new_containerapp["properties"] else self.new_containerapp["properties"]["template"]
            self.new_containerapp["properties"]["template"]["revisionSuffix"] = self.get_argument_revision_suffix()

        if self.get_argument_termination_grace_period() is not None:
            safe_set(self.new_containerapp, "properties", "template", "terminationGracePeriodSeconds",
                     value=self.get_argument_termination_grace_period())

        if self.get_argument_workload_profile_name():
            self.new_containerapp["properties"]["workloadProfileName"] = self.get_argument_workload_profile_name()

            parsed_managed_env = parse_resource_id(self.containerapp_def["properties"]["environmentId"])
            managed_env_name = parsed_managed_env['name']
            managed_env_rg = parsed_managed_env['resource_group']
            managed_env_info = None
            try:
                managed_env_info = self.get_environment_client().show(cmd=self.cmd, resource_group_name=managed_env_rg, name=managed_env_name)
            except Exception as e:
                handle_non_404_status_code_exception(e)

            if not managed_env_info:
                raise ValidationError(
                    "Error parsing the managed environment '{}' from the specified containerapp".format(
                        managed_env_name))

            ensure_workload_profile_supported(self.cmd, managed_env_name, managed_env_rg, self.get_argument_workload_profile_name(),
                                              managed_env_info)

        # Containers
        if update_map["container"]:
            self.new_containerapp["properties"]["template"] = {} if "template" not in self.new_containerapp["properties"] else self.new_containerapp["properties"]["template"]
            self.new_containerapp["properties"]["template"]["containers"] = self.containerapp_def["properties"]["template"]["containers"]
            if not self.get_argument_container_name():
                if len(self.new_containerapp["properties"]["template"]["containers"]) == 1:
                    container_name = self.new_containerapp["properties"]["template"]["containers"][0]["name"]
                    self.set_argument_container_name(container_name)
                else:
                    raise ValidationError(
                        "Usage error: --container-name is required when adding or updating a container")

            # Check if updating existing container
            updating_existing_container = self.set_up_existing_container_update()
            # If not updating existing container, add as new container
            if not updating_existing_container:
                if self.get_argument_image() is None:
                    raise ValidationError("Usage error: --image is required when adding a new container")

                resources_def = None
                if self.get_argument_cpu() is not None or self.get_argument_memory() is not None:
                    resources_def = ContainerResourcesModel
                    resources_def["cpu"] = self.get_argument_cpu()
                    resources_def["memory"] = self.get_argument_memory()

                container_def = ContainerModel
                container_def["name"] = self.get_argument_container_name()
                container_def["image"] = self.get_argument_image()
                container_def["env"] = []

                if self.get_argument_set_env_vars() is not None:
                    # env vars
                    _add_or_update_env_vars(container_def["env"], parse_env_var_flags(self.get_argument_set_env_vars()))

                if self.get_argument_replace_env_vars() is not None:
                    # env vars
                    _add_or_update_env_vars(container_def["env"], parse_env_var_flags(self.get_argument_replace_env_vars()))

                if self.get_argument_remove_env_vars() is not None:
                    # env vars
                    _remove_env_vars(container_def["env"], self.get_argument_remove_env_vars())

                if self.get_argument_remove_all_env_vars():
                    container_def["env"] = []

                if self.get_argument_startup_command() is not None:
                    if isinstance(self.get_argument_startup_command(), list) and not self.get_argument_startup_command():
                        container_def["command"] = None
                    else:
                        container_def["command"] = self.get_argument_startup_command()
                if self.get_argument_args() is not None:
                    if isinstance(self.get_argument_args(), list) and not self.get_argument_args():
                        container_def["args"] = None
                    else:
                        container_def["args"] = self.get_argument_args()
                if resources_def is not None:
                    container_def["resources"] = resources_def
                if self.get_argument_secret_volume_mount() is not None:
                    self.new_containerapp["properties"]["template"]["volumes"] = self.containerapp_def["properties"]["template"]["volumes"]
                    # generate a new volume name
                    volume_def = VolumeModel
                    volume_mount_def = VolumeMountModel
                    volume_def["name"] = _generate_secret_volume_name()
                    volume_def["storageType"] = "Secret"

                    # mount the volume to the container
                    volume_mount_def["volumeName"] = volume_def["name"]
                    volume_mount_def["mountPath"] = self.get_argument_secret_volume_mount()
                    container_def["volumeMounts"] = [volume_mount_def]
                    if "volumes" not in self.new_containerapp["properties"]["template"]:
                        self.new_containerapp["properties"]["template"]["volumes"] = [volume_def]
                    else:
                        self.new_containerapp["properties"]["template"]["volumes"].append(volume_def)

                self.new_containerapp["properties"]["template"]["containers"].append(container_def)
        # Scale
        if update_map["scale"]:
            self.new_containerapp["properties"]["template"] = {} if "template" not in self.new_containerapp["properties"] else self.new_containerapp["properties"]["template"]
            if "scale" not in self.new_containerapp["properties"]["template"]:
                self.new_containerapp["properties"]["template"]["scale"] = {}
            if self.get_argument_min_replicas() is not None:
                self.new_containerapp["properties"]["template"]["scale"]["minReplicas"] = self.get_argument_min_replicas()
            if self.get_argument_max_replicas() is not None:
                self.new_containerapp["properties"]["template"]["scale"]["maxReplicas"] = self.get_argument_max_replicas()

        scale_def = None
        if self.get_argument_min_replicas() is not None or self.get_argument_max_replicas() is not None:
            scale_def = ScaleModel
            scale_def["minReplicas"] = self.get_argument_min_replicas()
            scale_def["maxReplicas"] = self.get_argument_max_replicas()
        # so we don't overwrite rules
        if safe_get(self.new_containerapp, "properties", "template", "scale", "rules"):
            self.new_containerapp["properties"]["template"]["scale"].pop(["rules"])
        scale_rule_type = self.get_argument_scale_rule_type()
        if self.get_argument_scale_rule_name():
            if not scale_rule_type:
                scale_rule_type = "http"
            scale_rule_type = scale_rule_type.lower()
            scale_rule_def = deepcopy(ScaleRuleModel)
            curr_metadata = {}
            if self.get_argument_scale_rule_http_concurrency():
                if scale_rule_type == 'http':
                    curr_metadata["concurrentRequests"] = str(self.get_argument_scale_rule_http_concurrency())
                elif scale_rule_type == 'tcp':
                    curr_metadata["concurrentConnections"] = str(self.get_argument_scale_rule_http_concurrency())
            metadata_def = parse_metadata_flags(self.get_argument_scale_rule_metadata(), curr_metadata)
            auth_def = parse_auth_flags(self.get_argument_scale_rule_auth())
            if scale_rule_type == "http":
                scale_rule_def["name"] = self.get_argument_scale_rule_name()
                scale_rule_def["custom"] = None
                scale_rule_def["tcp"] = None
                scale_rule_def["http"] = {}
                scale_rule_def["http"]["metadata"] = metadata_def
                scale_rule_def["http"]["auth"] = auth_def
            elif scale_rule_type == "tcp":
                scale_rule_def["name"] = self.get_argument_scale_rule_name()
                scale_rule_def["custom"] = None
                scale_rule_def["http"] = None
                scale_rule_def["tcp"] = {}
                scale_rule_def["tcp"]["metadata"] = metadata_def
                scale_rule_def["tcp"]["auth"] = auth_def
            else:
                scale_rule_def["name"] = self.get_argument_scale_rule_name()
                scale_rule_def["http"] = None
                scale_rule_def["tcp"] = None
                scale_rule_def["custom"] = {}
                scale_rule_def["custom"]["type"] = scale_rule_type
                scale_rule_def["custom"]["metadata"] = metadata_def
                scale_rule_def["custom"]["auth"] = auth_def
            if not scale_def:
                scale_def = ScaleModel
            scale_def["rules"] = [scale_rule_def]
            self.new_containerapp["properties"]["template"]["scale"]["rules"] = scale_def["rules"]
        # Ingress
        if update_map["ingress"]:
            self.new_containerapp["properties"]["configuration"] = {} if "configuration" not in self.new_containerapp[
                "properties"] else self.new_containerapp["properties"]["configuration"]
            if self.get_argument_target_port() is not None or self.get_argument_ingress() is not None:
                self.new_containerapp["properties"]["configuration"]["ingress"] = {}
                if self.get_argument_ingress():
                    self.new_containerapp["properties"]["configuration"]["ingress"][
                        "external"] = self.get_argument_ingress().lower() == "external"
                if self.get_argument_target_port():
                    self.new_containerapp["properties"]["configuration"]["ingress"]["targetPort"] = self.get_argument_target_port()

        # Registry
        if update_map["registry"]:  # pylint: disable=too-many-nested-blocks
            self.new_containerapp["properties"]["configuration"] = {} if "configuration" not in self.new_containerapp[
                "properties"] else self.new_containerapp["properties"]["configuration"]
            if "registries" in self.containerapp_def["properties"]["configuration"]:
                self.new_containerapp["properties"]["configuration"]["registries"] = self.containerapp_def["properties"]["configuration"]["registries"]
            if "registries" not in self.containerapp_def["properties"]["configuration"] or \
                    self.containerapp_def["properties"]["configuration"]["registries"] is None:
                self.new_containerapp["properties"]["configuration"]["registries"] = []

            registries_def = self.new_containerapp["properties"]["configuration"]["registries"]

            self.set_up_get_existing_secrets(self.containerapp_def)
            if "secrets" in self.containerapp_def["properties"]["configuration"] and self.containerapp_def["properties"]["configuration"]["secrets"]:
                self.new_containerapp["properties"]["configuration"]["secrets"] = self.containerapp_def["properties"]["configuration"]["secrets"]
            else:
                self.new_containerapp["properties"]["configuration"]["secrets"] = []

            if self.get_argument_registry_server():
                if (not self.get_argument_registry_pass() or not self.get_argument_registry_user()) and not self.get_argument_registry_identity():
                    if ACR_IMAGE_SUFFIX not in self.get_argument_registry_server():
                        raise RequiredArgumentMissingError(
                            'Registry url is required if using Azure Container Registry, otherwise Registry username and password are required if using Dockerhub')
                    logger.warning(
                        'No credential was provided to access Azure Container Registry. Trying to look up...')
                    parsed = urlparse(self.get_argument_registry_server())
                    registry_name = (parsed.netloc if parsed.scheme else parsed.path).split('.')[0]
                    registry_user, registry_pass, _ = _get_acr_cred(self.cmd.cli_ctx, registry_name)
                    self.set_argument_registry_user(registry_user)
                    self.set_argument_registry_pass(registry_pass)

                # Check if updating existing registry
                updating_existing_registry = False
                for r in registries_def:
                    if r['server'].lower() == self.get_argument_registry_server().lower():
                        updating_existing_registry = True
                        # registry (username and password) and identity are mutually exclusive, set identity to None when setting username or password
                        if (self.get_argument_registry_user() or self.get_argument_registry_pass()) and self.get_argument_registry_identity() is None:
                            r["identity"] = None
                        if self.get_argument_registry_user():
                            r["username"] = self.get_argument_registry_user()
                        if self.get_argument_registry_pass():
                            r["passwordSecretRef"] = store_as_secret_and_return_secret_ref(
                                self.new_containerapp["properties"]["configuration"]["secrets"],
                                r["username"],
                                r["server"],
                                self.get_argument_registry_pass(),
                                update_existing_secret=True,
                                disable_warnings=True)
                        if self.get_argument_registry_identity():
                            r["identity"] = self.get_argument_registry_identity()
                            if r["username"]:
                                _remove_registry_secret(containerapp_def=self.new_containerapp, server=r["server"], username=r["username"])
                            r["username"] = None
                            r["passwordSecretRef"] = None

                # If not updating existing registry, add as new registry
                if not updating_existing_registry:
                    registry = RegistryCredentialsModel
                    registry["server"] = self.get_argument_registry_server()
                    registry["username"] = self.get_argument_registry_user()
                    registry["identity"] = self.get_argument_registry_identity()
                    if self.get_argument_registry_pass():
                        registry["passwordSecretRef"] = store_as_secret_and_return_secret_ref(
                            self.new_containerapp["properties"]["configuration"]["secrets"],
                            self.get_argument_registry_user(),
                            self.get_argument_registry_server(),
                            self.get_argument_registry_pass(),
                            update_existing_secret=True,
                            disable_warnings=True)

                    registries_def.append(registry)

        if not self.get_argument_revision_suffix():
            safe_set(self.new_containerapp, "properties", "template", "revisionSuffix", value=None)

        if self.get_argument_revisions_mode():
            safe_set(self.new_containerapp, "properties", "configuration", "activeRevisionsMode", value=self.get_argument_revisions_mode())

        if self.get_argument_target_label():
            safe_set(self.new_containerapp, "properties", "configuration", "targetLabel", value=self.get_argument_target_label())