in src/azure-cli/azure/cli/command_modules/containerapp/containerapp_job_decorator.py [0:0]
def construct_payload(self):
if self.get_argument_registry_identity() and not is_registry_msi_system(self.get_argument_registry_identity()):
logger.info("Creating an acrpull role assignment for the registry identity")
create_acrpull_role_assignment(self.cmd, self.get_argument_registry_server(), self.get_argument_registry_identity(), skip_error=True)
if self.get_argument_yaml():
return self.set_up_create_containerapp_job_yaml(name=self.get_argument_name(), file_name=self.get_argument_yaml())
if not self.get_argument_image():
self.set_argument_image(HELLO_WORLD_IMAGE)
# Validate managed environment
parsed_managed_env = parse_resource_id(self.get_argument_managed_env())
managed_env_name = parsed_managed_env['name']
managed_env_rg = parsed_managed_env['resource_group']
managed_env_info = None
try:
managed_env_info = self.get_environment_client().show(cmd=self.cmd, resource_group_name=managed_env_rg, name=managed_env_name)
except: # pylint: disable=bare-except
pass
if not managed_env_info:
raise ValidationError(
"The environment '{}' does not exist. Specify a valid environment".format(self.get_argument_managed_env()))
location = managed_env_info["location"]
_ensure_location_allowed(self.cmd, location, CONTAINER_APPS_RP, "jobs")
if not self.get_argument_workload_profile_name() and "workloadProfiles" in managed_env_info:
workload_profile_name = get_default_workload_profile_name_from_env(self.cmd, managed_env_info, managed_env_rg)
self.set_augument_workload_profile_name(workload_profile_name)
manualTriggerConfig_def = None
if self.get_argument_trigger_type() is not None and self.get_argument_trigger_type().lower() == "manual":
manualTriggerConfig_def = ManualTriggerModel
manualTriggerConfig_def[
"replicaCompletionCount"] = 0 if self.get_argument_replica_completion_count() is None else self.get_argument_replica_completion_count()
manualTriggerConfig_def["parallelism"] = 0 if self.get_argument_parallelism() is None else self.get_argument_parallelism()
scheduleTriggerConfig_def = None
if self.get_argument_trigger_type() is not None and self.get_argument_trigger_type().lower() == "schedule":
scheduleTriggerConfig_def = ScheduleTriggerModel
scheduleTriggerConfig_def[
"replicaCompletionCount"] = 0 if self.get_argument_replica_completion_count() is None else self.get_argument_replica_completion_count()
scheduleTriggerConfig_def["parallelism"] = 0 if self.get_argument_parallelism() is None else self.get_argument_parallelism()
scheduleTriggerConfig_def["cronExpression"] = self.get_argument_cron_expression()
eventTriggerConfig_def = None
if self.get_argument_trigger_type() is not None and self.get_argument_trigger_type().lower() == "event":
scale_def = None
if self.get_argument_min_executions() is not None or self.get_argument_max_executions() is not None or self.get_argument_polling_interval() is not None:
scale_def = JobScaleModel
scale_def["pollingInterval"] = self.get_argument_polling_interval()
scale_def["minExecutions"] = self.get_argument_min_executions()
scale_def["maxExecutions"] = self.get_argument_max_executions()
if self.get_argument_scale_rule_name():
scale_rule_type = self.get_argument_scale_rule_type().lower()
scale_rule_def = ScaleRuleModel
curr_metadata = {}
metadata_def = parse_metadata_flags(self.get_argument_scale_rule_metadata(), curr_metadata)
auth_def = parse_auth_flags(self.get_argument_scale_rule_auth())
scale_rule_def["name"] = self.get_argument_scale_rule_name()
scale_rule_def["type"] = scale_rule_type
scale_rule_def["metadata"] = metadata_def
scale_rule_def["auth"] = auth_def
if not scale_def:
scale_def = JobScaleModel
scale_def["rules"] = [scale_rule_def]
eventTriggerConfig_def = EventTriggerModel
eventTriggerConfig_def["replicaCompletionCount"] = self.get_argument_replica_completion_count()
eventTriggerConfig_def["parallelism"] = self.get_argument_parallelism()
eventTriggerConfig_def["scale"] = scale_def
secrets_def = None
if self.get_argument_secrets() is not None:
secrets_def = parse_secret_flags(self.get_argument_secrets())
registries_def = None
if self.get_argument_registry_server() is not None and not is_registry_msi_system(self.get_argument_registry_identity()):
registries_def = RegistryCredentialsModel
registries_def["server"] = self.get_argument_registry_server()
# Infer credentials if not supplied and its azurecr
if (self.get_argument_registry_user() is None or self.get_argument_registry_pass() is None) and self.get_argument_registry_identity() is None:
registry_user, registry_pass = _infer_acr_credentials(self.cmd, self.get_argument_registry_server(), self.get_argument_disable_warnings())
self.set_argument_registry_user(registry_user)
self.set_argument_registry_pass(registry_pass)
if not self.get_argument_registry_identity():
registries_def["username"] = self.get_argument_registry_user()
if secrets_def is None:
secrets_def = []
registries_def["passwordSecretRef"] = store_as_secret_and_return_secret_ref(secrets_def, self.get_argument_registry_user(),
self.get_argument_registry_server(),
self.get_argument_registry_pass(),
disable_warnings=self.get_argument_disable_warnings())
else:
registries_def["identity"] = self.get_argument_registry_identity()
config_def = JobConfigurationModel
config_def["secrets"] = secrets_def
config_def["triggerType"] = self.get_argument_trigger_type()
config_def["replicaTimeout"] = self.get_argument_replica_timeout()
config_def["replicaRetryLimit"] = self.get_argument_replica_retry_limit()
config_def["manualTriggerConfig"] = manualTriggerConfig_def
config_def["scheduleTriggerConfig"] = scheduleTriggerConfig_def
config_def["eventTriggerConfig"] = eventTriggerConfig_def
config_def["registries"] = [registries_def] if registries_def is not None else None
# Identity actions
identity_def = ManagedServiceIdentityModel
identity_def["type"] = "None"
assign_system_identity = self.get_argument_system_assigned()
if self.get_argument_user_assigned():
assign_user_identities = [x.lower() for x in self.get_argument_user_assigned()]
else:
assign_user_identities = []
if assign_system_identity and assign_user_identities:
identity_def["type"] = "SystemAssigned, UserAssigned"
elif assign_system_identity:
identity_def["type"] = "SystemAssigned"
elif assign_user_identities:
identity_def["type"] = "UserAssigned"
if assign_user_identities:
identity_def["userAssignedIdentities"] = {}
subscription_id = get_subscription_id(self.cmd.cli_ctx)
for r in assign_user_identities:
r = _ensure_identity_resource_id(subscription_id, self.get_argument_resource_group_name(), r)
identity_def["userAssignedIdentities"][r] = {} # pylint: disable=unsupported-assignment-operation
resources_def = None
if self.get_argument_cpu() is not None or self.get_argument_memory() is not None:
resources_def = ContainerResourcesModel
resources_def["cpu"] = self.get_argument_cpu()
resources_def["memory"] = self.get_argument_memory()
container_def = ContainerModel
container_def["name"] = self.get_argument_container_name() if self.get_argument_container_name() else self.get_argument_name()
container_def["image"] = self.get_argument_image() if not is_registry_msi_system(self.get_argument_registry_identity()) else HELLO_WORLD_IMAGE
if self.get_argument_env_vars() is not None:
container_def["env"] = parse_env_var_flags(self.get_argument_env_vars())
if self.get_argument_startup_command() is not None:
container_def["command"] = self.get_argument_startup_command()
if self.get_argument_args() is not None:
container_def["args"] = self.get_argument_args()
if resources_def is not None:
container_def["resources"] = resources_def
template_def = JobTemplateModel
template_def["containers"] = [container_def]
self.containerappjob_def["location"] = location
self.containerappjob_def["identity"] = identity_def
self.containerappjob_def["properties"]["environmentId"] = self.get_argument_managed_env()
self.containerappjob_def["properties"]["configuration"] = config_def
self.containerappjob_def["properties"]["template"] = template_def
self.containerappjob_def["tags"] = self.get_argument_tags()
if self.get_argument_workload_profile_name():
self.containerappjob_def["properties"]["workloadProfileName"] = self.get_argument_workload_profile_name()
ensure_workload_profile_supported(self.cmd, managed_env_name, managed_env_rg, self.get_argument_workload_profile_name(),
managed_env_info)
if self.get_argument_registry_identity():
if is_registry_msi_system(self.get_argument_registry_identity()):
set_managed_identity(self.cmd, self.get_argument_resource_group_name(), self.containerappjob_def, system_assigned=True)
else:
set_managed_identity(self.cmd, self.get_argument_resource_group_name(), self.containerappjob_def, user_assigned=[self.get_argument_registry_identity()])