def parent_construct_payload()

in src/containerapp/azext_containerapp/containerapp_job_decorator.py [0:0]


    def parent_construct_payload(self):
        # preview logic
        self.check_create_acrpull_role_assignment()
        # end preview logic

        if self.get_argument_yaml():
            return self.set_up_create_containerapp_job_yaml(name=self.get_argument_name(), file_name=self.get_argument_yaml())

        if not self.get_argument_image():
            self.set_argument_image(HELLO_WORLD_IMAGE)

        # Validate managed environment
        parsed_managed_env = parse_resource_id(self.get_argument_managed_env())
        managed_env_name = parsed_managed_env['name']
        managed_env_rg = parsed_managed_env['resource_group']
        managed_env_info = None

        try:
            managed_env_info = self.get_environment_client().show(cmd=self.cmd, resource_group_name=managed_env_rg, name=managed_env_name)
        except:  # pylint: disable=bare-except
            pass

        if not managed_env_info:
            raise ValidationError(
                "The environment '{}' does not exist. Specify a valid environment".format(self.get_argument_managed_env()))

        location = managed_env_info["location"]
        _ensure_location_allowed(self.cmd, location, CONTAINER_APPS_RP, "jobs")

        if not self.get_argument_workload_profile_name() and "workloadProfiles" in managed_env_info:
            workload_profile_name = get_default_workload_profile_name_from_env(self.cmd, managed_env_info, managed_env_rg)
            self.set_augument_workload_profile_name(workload_profile_name)

        manualTriggerConfig_def = None
        if self.get_argument_trigger_type() is not None and self.get_argument_trigger_type().lower() == "manual":
            manualTriggerConfig_def = ManualTriggerModel
            manualTriggerConfig_def[
                "replicaCompletionCount"] = 0 if self.get_argument_replica_completion_count() is None else self.get_argument_replica_completion_count()
            manualTriggerConfig_def["parallelism"] = 0 if self.get_argument_parallelism() is None else self.get_argument_parallelism()

        scheduleTriggerConfig_def = None
        if self.get_argument_trigger_type() is not None and self.get_argument_trigger_type().lower() == "schedule":
            scheduleTriggerConfig_def = ScheduleTriggerModel
            scheduleTriggerConfig_def[
                "replicaCompletionCount"] = 0 if self.get_argument_replica_completion_count() is None else self.get_argument_replica_completion_count()
            scheduleTriggerConfig_def["parallelism"] = 0 if self.get_argument_parallelism() is None else self.get_argument_parallelism()
            scheduleTriggerConfig_def["cronExpression"] = self.get_argument_cron_expression()

        eventTriggerConfig_def = None
        if self.get_argument_trigger_type() is not None and self.get_argument_trigger_type().lower() == "event":
            scale_def = None
            if self.get_argument_min_executions() is not None or self.get_argument_max_executions() is not None or self.get_argument_polling_interval() is not None:
                scale_def = JobScaleModel
                scale_def["pollingInterval"] = self.get_argument_polling_interval()
                scale_def["minExecutions"] = self.get_argument_min_executions()
                scale_def["maxExecutions"] = self.get_argument_max_executions()

            if self.get_argument_scale_rule_name():
                scale_rule_type = self.get_argument_scale_rule_type().lower()
                scale_rule_def = ScaleRuleModel
                curr_metadata = {}
                metadata_def = parse_metadata_flags(self.get_argument_scale_rule_metadata(), curr_metadata)
                auth_def = parse_auth_flags(self.get_argument_scale_rule_auth())
                scale_rule_def["name"] = self.get_argument_scale_rule_name()
                scale_rule_def["type"] = scale_rule_type
                scale_rule_def["metadata"] = metadata_def
                scale_rule_def["auth"] = auth_def

                if not scale_def:
                    scale_def = JobScaleModel
                scale_def["rules"] = [scale_rule_def]

            eventTriggerConfig_def = EventTriggerModel
            eventTriggerConfig_def["replicaCompletionCount"] = self.get_argument_replica_completion_count()
            eventTriggerConfig_def["parallelism"] = self.get_argument_parallelism()
            eventTriggerConfig_def["scale"] = scale_def

        secrets_def = None
        if self.get_argument_secrets() is not None:
            secrets_def = parse_secret_flags(self.get_argument_secrets())

        registries_def = None
        if self.get_argument_registry_server() is not None and not is_registry_msi_system(self.get_argument_registry_identity()):
            registries_def = RegistryCredentialsModel
            registries_def["server"] = self.get_argument_registry_server()

            # Infer credentials if not supplied and its azurecr
            if (self.get_argument_registry_user() is None or self.get_argument_registry_pass() is None) and self.get_argument_registry_identity() is None:
                registry_user, registry_pass = _infer_acr_credentials(self.cmd, self.get_argument_registry_server(), self.get_argument_disable_warnings())
                self.set_argument_registry_user(registry_user)
                self.set_argument_registry_pass(registry_pass)

            if not self.get_argument_registry_identity():
                registries_def["username"] = self.get_argument_registry_user()

                if secrets_def is None:
                    secrets_def = []
                registries_def["passwordSecretRef"] = store_as_secret_and_return_secret_ref(secrets_def, self.get_argument_registry_user(),
                                                                                            self.get_argument_registry_server(),
                                                                                            self.get_argument_registry_pass(),
                                                                                            disable_warnings=self.get_argument_disable_warnings())
            else:
                registries_def["identity"] = self.get_argument_registry_identity()

        config_def = JobConfigurationModel
        config_def["secrets"] = secrets_def
        config_def["triggerType"] = self.get_argument_trigger_type()
        config_def["replicaTimeout"] = self.get_argument_replica_timeout()
        config_def["replicaRetryLimit"] = self.get_argument_replica_retry_limit()
        config_def["manualTriggerConfig"] = manualTriggerConfig_def
        config_def["scheduleTriggerConfig"] = scheduleTriggerConfig_def
        config_def["eventTriggerConfig"] = eventTriggerConfig_def
        config_def["registries"] = [registries_def] if registries_def is not None else None

        # Identity actions
        identity_def = ManagedServiceIdentityModel
        identity_def["type"] = "None"

        assign_system_identity = self.get_argument_system_assigned()
        if self.get_argument_user_assigned():
            assign_user_identities = [x.lower() for x in self.get_argument_user_assigned()]
        else:
            assign_user_identities = []

        if assign_system_identity and assign_user_identities:
            identity_def["type"] = "SystemAssigned, UserAssigned"
        elif assign_system_identity:
            identity_def["type"] = "SystemAssigned"
        elif assign_user_identities:
            identity_def["type"] = "UserAssigned"

        if assign_user_identities:
            identity_def["userAssignedIdentities"] = {}
            subscription_id = get_subscription_id(self.cmd.cli_ctx)

            for r in assign_user_identities:
                r = _ensure_identity_resource_id(subscription_id, self.get_argument_resource_group_name(), r)
                identity_def["userAssignedIdentities"][r] = {}  # pylint: disable=unsupported-assignment-operation

        resources_def = None
        if self.get_argument_cpu() is not None or self.get_argument_memory() is not None:
            resources_def = ContainerResourcesModel
            resources_def["cpu"] = self.get_argument_cpu()
            resources_def["memory"] = self.get_argument_memory()

        container_def = ContainerModel
        container_def["name"] = self.get_argument_container_name() if self.get_argument_container_name() else self.get_argument_name()
        container_def["image"] = self.get_argument_image() if not is_registry_msi_system(self.get_argument_registry_identity()) else HELLO_WORLD_IMAGE
        if self.get_argument_env_vars() is not None:
            container_def["env"] = parse_env_var_flags(self.get_argument_env_vars())
        if self.get_argument_startup_command() is not None:
            container_def["command"] = self.get_argument_startup_command()
        if self.get_argument_args() is not None:
            container_def["args"] = self.get_argument_args()
        if resources_def is not None:
            container_def["resources"] = resources_def

        template_def = JobTemplateModel
        template_def["containers"] = [container_def]

        self.containerappjob_def["location"] = location
        self.containerappjob_def["identity"] = identity_def
        self.containerappjob_def["properties"]["environmentId"] = self.get_argument_managed_env()
        self.containerappjob_def["properties"]["configuration"] = config_def
        self.containerappjob_def["properties"]["template"] = template_def
        self.containerappjob_def["tags"] = self.get_argument_tags()

        if self.get_argument_workload_profile_name():
            self.containerappjob_def["properties"]["workloadProfileName"] = self.get_argument_workload_profile_name()
            ensure_workload_profile_supported(self.cmd, managed_env_name, managed_env_rg, self.get_argument_workload_profile_name(),
                                              managed_env_info)

        # preview logic
        self.set_up_registry_identity()