in src/containerapp/azext_containerapp/containerapp_decorator.py [0:0]
def construct_payload(self):
# construct from yaml
if self.get_argument_yaml():
return self.set_up_update_containerapp_yaml(name=self.get_argument_name(), file_name=self.get_argument_yaml())
self.new_containerapp["properties"] = {}
self.set_up_from_revision()
# Doing this while API has bug. If env var is an empty string, API doesn't return "value" even though the "value" should be an empty string
for container in safe_get(self.containerapp_def, "properties", "template", "containers", default=[]):
if "env" in container:
for e in container["env"]:
if "value" not in e:
e["value"] = ""
update_map = {}
update_map['scale'] = self.get_argument_min_replicas() is not None or self.get_argument_max_replicas() is not None or self.get_argument_scale_rule_name()
update_map['container'] = self._need_update_container()
update_map['ingress'] = self.get_argument_ingress() or self.get_argument_target_port()
update_map['registry'] = self.get_argument_registry_server() or self.get_argument_registry_user() or self.get_argument_registry_pass() or self.get_argument_registry_identity()
if self.get_argument_tags():
_add_or_update_tags(self.new_containerapp, self.get_argument_tags())
if self.get_argument_revision_suffix() is not None:
self.new_containerapp["properties"]["template"] = {} if "template" not in self.new_containerapp["properties"] else self.new_containerapp["properties"]["template"]
self.new_containerapp["properties"]["template"]["revisionSuffix"] = self.get_argument_revision_suffix()
if self.get_argument_termination_grace_period() is not None:
safe_set(self.new_containerapp, "properties", "template", "terminationGracePeriodSeconds",
value=self.get_argument_termination_grace_period())
if self.get_argument_workload_profile_name():
self.new_containerapp["properties"]["workloadProfileName"] = self.get_argument_workload_profile_name()
parsed_managed_env = parse_resource_id(self.containerapp_def["properties"]["environmentId"])
managed_env_name = parsed_managed_env['name']
managed_env_rg = parsed_managed_env['resource_group']
managed_env_info = None
try:
managed_env_info = self.get_environment_client().show(cmd=self.cmd, resource_group_name=managed_env_rg, name=managed_env_name)
except Exception as e:
handle_non_404_status_code_exception(e)
if not managed_env_info:
raise ValidationError(
"Error parsing the managed environment '{}' from the specified containerapp".format(
managed_env_name))
ensure_workload_profile_supported(self.cmd, managed_env_name, managed_env_rg, self.get_argument_workload_profile_name(),
managed_env_info)
# Containers
if update_map["container"]:
self.new_containerapp["properties"]["template"] = {} if "template" not in self.new_containerapp["properties"] else self.new_containerapp["properties"]["template"]
self.new_containerapp["properties"]["template"]["containers"] = self.containerapp_def["properties"]["template"]["containers"]
if not self.get_argument_container_name():
if len(self.new_containerapp["properties"]["template"]["containers"]) == 1:
container_name = self.new_containerapp["properties"]["template"]["containers"][0]["name"]
self.set_argument_container_name(container_name)
else:
raise ValidationError(
"Usage error: --container-name is required when adding or updating a container")
# Check if updating existing container
updating_existing_container = self.set_up_existing_container_update()
# If not updating existing container, add as new container
if not updating_existing_container:
if self.get_argument_image() is None:
raise ValidationError("Usage error: --image is required when adding a new container")
resources_def = None
if self.get_argument_cpu() is not None or self.get_argument_memory() is not None:
resources_def = ContainerResourcesModel
resources_def["cpu"] = self.get_argument_cpu()
resources_def["memory"] = self.get_argument_memory()
container_def = ContainerModel
container_def["name"] = self.get_argument_container_name()
container_def["image"] = self.get_argument_image()
container_def["env"] = []
if self.get_argument_set_env_vars() is not None:
# env vars
_add_or_update_env_vars(container_def["env"], parse_env_var_flags(self.get_argument_set_env_vars()))
if self.get_argument_replace_env_vars() is not None:
# env vars
_add_or_update_env_vars(container_def["env"], parse_env_var_flags(self.get_argument_replace_env_vars()))
if self.get_argument_remove_env_vars() is not None:
# env vars
_remove_env_vars(container_def["env"], self.get_argument_remove_env_vars())
if self.get_argument_remove_all_env_vars():
container_def["env"] = []
if self.get_argument_startup_command() is not None:
if isinstance(self.get_argument_startup_command(), list) and not self.get_argument_startup_command():
container_def["command"] = None
else:
container_def["command"] = self.get_argument_startup_command()
if self.get_argument_args() is not None:
if isinstance(self.get_argument_args(), list) and not self.get_argument_args():
container_def["args"] = None
else:
container_def["args"] = self.get_argument_args()
if resources_def is not None:
container_def["resources"] = resources_def
if self.get_argument_secret_volume_mount() is not None:
self.new_containerapp["properties"]["template"]["volumes"] = self.containerapp_def["properties"]["template"]["volumes"]
# generate a new volume name
volume_def = VolumeModel
volume_mount_def = VolumeMountModel
volume_def["name"] = _generate_secret_volume_name()
volume_def["storageType"] = "Secret"
# mount the volume to the container
volume_mount_def["volumeName"] = volume_def["name"]
volume_mount_def["mountPath"] = self.get_argument_secret_volume_mount()
container_def["volumeMounts"] = [volume_mount_def]
if "volumes" not in self.new_containerapp["properties"]["template"]:
self.new_containerapp["properties"]["template"]["volumes"] = [volume_def]
else:
self.new_containerapp["properties"]["template"]["volumes"].append(volume_def)
self.new_containerapp["properties"]["template"]["containers"].append(container_def)
# Scale
if update_map["scale"]:
self.new_containerapp["properties"]["template"] = {} if "template" not in self.new_containerapp["properties"] else self.new_containerapp["properties"]["template"]
if "scale" not in self.new_containerapp["properties"]["template"]:
self.new_containerapp["properties"]["template"]["scale"] = {}
if self.get_argument_min_replicas() is not None:
self.new_containerapp["properties"]["template"]["scale"]["minReplicas"] = self.get_argument_min_replicas()
if self.get_argument_max_replicas() is not None:
self.new_containerapp["properties"]["template"]["scale"]["maxReplicas"] = self.get_argument_max_replicas()
scale_def = None
if self.get_argument_min_replicas() is not None or self.get_argument_max_replicas() is not None:
scale_def = ScaleModel
scale_def["minReplicas"] = self.get_argument_min_replicas()
scale_def["maxReplicas"] = self.get_argument_max_replicas()
# so we don't overwrite rules
if safe_get(self.new_containerapp, "properties", "template", "scale", "rules"):
self.new_containerapp["properties"]["template"]["scale"].pop(["rules"])
scale_rule_type = self.get_argument_scale_rule_type()
if self.get_argument_scale_rule_name():
if not scale_rule_type:
scale_rule_type = "http"
scale_rule_type = scale_rule_type.lower()
scale_rule_def = deepcopy(ScaleRuleModel)
curr_metadata = {}
if self.get_argument_scale_rule_http_concurrency():
if scale_rule_type == 'http':
curr_metadata["concurrentRequests"] = str(self.get_argument_scale_rule_http_concurrency())
elif scale_rule_type == 'tcp':
curr_metadata["concurrentConnections"] = str(self.get_argument_scale_rule_http_concurrency())
metadata_def = parse_metadata_flags(self.get_argument_scale_rule_metadata(), curr_metadata)
auth_def = parse_auth_flags(self.get_argument_scale_rule_auth())
if scale_rule_type == "http":
scale_rule_def["name"] = self.get_argument_scale_rule_name()
scale_rule_def["custom"] = None
scale_rule_def["tcp"] = None
scale_rule_def["http"] = {}
scale_rule_def["http"]["metadata"] = metadata_def
scale_rule_def["http"]["auth"] = auth_def
elif scale_rule_type == "tcp":
scale_rule_def["name"] = self.get_argument_scale_rule_name()
scale_rule_def["custom"] = None
scale_rule_def["http"] = None
scale_rule_def["tcp"] = {}
scale_rule_def["tcp"]["metadata"] = metadata_def
scale_rule_def["tcp"]["auth"] = auth_def
else:
scale_rule_def["name"] = self.get_argument_scale_rule_name()
scale_rule_def["http"] = None
scale_rule_def["tcp"] = None
scale_rule_def["custom"] = {}
scale_rule_def["custom"]["type"] = scale_rule_type
scale_rule_def["custom"]["metadata"] = metadata_def
scale_rule_def["custom"]["auth"] = auth_def
if not scale_def:
scale_def = ScaleModel
scale_def["rules"] = [scale_rule_def]
self.new_containerapp["properties"]["template"]["scale"]["rules"] = scale_def["rules"]
# Ingress
if update_map["ingress"]:
self.new_containerapp["properties"]["configuration"] = {} if "configuration" not in self.new_containerapp[
"properties"] else self.new_containerapp["properties"]["configuration"]
if self.get_argument_target_port() is not None or self.get_argument_ingress() is not None:
self.new_containerapp["properties"]["configuration"]["ingress"] = {}
if self.get_argument_ingress():
self.new_containerapp["properties"]["configuration"]["ingress"][
"external"] = self.get_argument_ingress().lower() == "external"
if self.get_argument_target_port():
self.new_containerapp["properties"]["configuration"]["ingress"]["targetPort"] = self.get_argument_target_port()
# Registry
if update_map["registry"]: # pylint: disable=too-many-nested-blocks
self.new_containerapp["properties"]["configuration"] = {} if "configuration" not in self.new_containerapp[
"properties"] else self.new_containerapp["properties"]["configuration"]
if "registries" in self.containerapp_def["properties"]["configuration"]:
self.new_containerapp["properties"]["configuration"]["registries"] = self.containerapp_def["properties"]["configuration"]["registries"]
if "registries" not in self.containerapp_def["properties"]["configuration"] or \
self.containerapp_def["properties"]["configuration"]["registries"] is None:
self.new_containerapp["properties"]["configuration"]["registries"] = []
registries_def = self.new_containerapp["properties"]["configuration"]["registries"]
self.set_up_get_existing_secrets(self.containerapp_def)
if "secrets" in self.containerapp_def["properties"]["configuration"] and self.containerapp_def["properties"]["configuration"]["secrets"]:
self.new_containerapp["properties"]["configuration"]["secrets"] = self.containerapp_def["properties"]["configuration"]["secrets"]
else:
self.new_containerapp["properties"]["configuration"]["secrets"] = []
if self.get_argument_registry_server():
if (not self.get_argument_registry_pass() or not self.get_argument_registry_user()) and not self.get_argument_registry_identity():
if ACR_IMAGE_SUFFIX not in self.get_argument_registry_server():
raise RequiredArgumentMissingError(
'Registry url is required if using Azure Container Registry, otherwise Registry username and password are required if using Dockerhub')
logger.warning(
'No credential was provided to access Azure Container Registry. Trying to look up...')
parsed = urlparse(self.get_argument_registry_server())
registry_name = (parsed.netloc if parsed.scheme else parsed.path).split('.')[0]
registry_user, registry_pass, _ = _get_acr_cred(self.cmd.cli_ctx, registry_name)
self.set_argument_registry_user(registry_user)
self.set_argument_registry_pass(registry_pass)
# Check if updating existing registry
updating_existing_registry = False
for r in registries_def:
if r['server'].lower() == self.get_argument_registry_server().lower():
updating_existing_registry = True
# registry (username and password) and identity are mutually exclusive, set identity to None when setting username or password
if (self.get_argument_registry_user() or self.get_argument_registry_pass()) and self.get_argument_registry_identity() is None:
r["identity"] = None
if self.get_argument_registry_user():
r["username"] = self.get_argument_registry_user()
if self.get_argument_registry_pass():
r["passwordSecretRef"] = store_as_secret_and_return_secret_ref(
self.new_containerapp["properties"]["configuration"]["secrets"],
r["username"],
r["server"],
self.get_argument_registry_pass(),
update_existing_secret=True,
disable_warnings=True)
if self.get_argument_registry_identity():
r["identity"] = self.get_argument_registry_identity()
if r["username"]:
_remove_registry_secret(containerapp_def=self.new_containerapp, server=r["server"], username=r["username"])
r["username"] = None
r["passwordSecretRef"] = None
# If not updating existing registry, add as new registry
if not updating_existing_registry:
registry = RegistryCredentialsModel
registry["server"] = self.get_argument_registry_server()
registry["username"] = self.get_argument_registry_user()
registry["identity"] = self.get_argument_registry_identity()
if self.get_argument_registry_pass():
registry["passwordSecretRef"] = store_as_secret_and_return_secret_ref(
self.new_containerapp["properties"]["configuration"]["secrets"],
self.get_argument_registry_user(),
self.get_argument_registry_server(),
self.get_argument_registry_pass(),
update_existing_secret=True,
disable_warnings=True)
registries_def.append(registry)
if not self.get_argument_revision_suffix():
safe_set(self.new_containerapp, "properties", "template", "revisionSuffix", value=None)
if self.get_argument_revisions_mode():
safe_set(self.new_containerapp, "properties", "configuration", "activeRevisionsMode", value=self.get_argument_revisions_mode())
if self.get_argument_target_label():
safe_set(self.new_containerapp, "properties", "configuration", "targetLabel", value=self.get_argument_target_label())