def arguments()

in src/sagemaker/workflow/notebook_job_step.py [0:0]


    def arguments(self) -> RequestType:
        """Generates the arguments dictionary that is used to create the job."""

        step_compilation_context = load_step_compilation_context()
        self.sagemaker_session = step_compilation_context.sagemaker_session
        self._region_from_session = self.sagemaker_session.boto_region_name

        self._resolve_defaults()
        # validate the inputs after resolving defaults
        self._validate_inputs()

        # generate training job name which is the key for the underlying training job
        self._underlying_job_prefix = self._get_job_name_prefix(self.notebook_job_name)

        pipeline_name = (
            step_compilation_context.pipeline_name
            if step_compilation_context
            else self._underlying_job_prefix
        )

        # step 1 - prepare for the staged input and upload to s3
        input_staged_folder_s3_uri = s3_path_join(
            self.s3_root_uri,
            pipeline_name,
            self.name,
            name_from_base("input"),
        )

        upload_list = [self.input_notebook]
        if self.initialization_script:
            upload_list.append(self.initialization_script)

        if self.additional_dependencies:
            upload_list = upload_list + self.additional_dependencies

        self._upload_job_files(
            s3_base_uri=input_staged_folder_s3_uri,
            paths_to_upload=upload_list,
            kms_key=self.s3_kms_key,
            sagemaker_session=self.sagemaker_session,
        )

        # step 2 - compose the job request
        request_dict = dict(
            TrainingJobName=self._underlying_job_prefix,
            RoleArn=self.role,
            RetryStrategy={"MaximumRetryAttempts": self.max_retry_attempts},
            StoppingCondition={
                "MaxRuntimeInSeconds": self.max_runtime_in_seconds,
            },
            EnableInterContainerTrafficEncryption=self.encrypt_inter_container_traffic,
        )

        # training algorithm config
        algorithm_spec = dict(
            TrainingImage=self.image_uri,
            TrainingInputMode="File",
            ContainerEntrypoint=self._scheduler_container_entry_point,
        )
        if self._scheduler_container_arguments:
            algorithm_spec["ContainerArguments"] = self._scheduler_container_arguments

        request_dict["AlgorithmSpecification"] = algorithm_spec

        # training input channel
        input_data_config = [
            {
                "ChannelName": "sagemaker_headless_execution_pipelinestep",
                "DataSource": {
                    "S3DataSource": {
                        "S3DataType": "S3Prefix",
                        "S3Uri": input_staged_folder_s3_uri,
                        "S3DataDistributionType": "FullyReplicated",
                    }
                },
            },
        ]
        request_dict["InputDataConfig"] = input_data_config

        # training output
        if step_compilation_context:
            output_staged_folder_s3_uri = Join(
                "/",
                [
                    self.s3_root_uri,
                    pipeline_name,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    self.name,
                ],
            )
        else:
            output_staged_folder_s3_uri = s3_path_join(
                self.s3_root_uri,
                pipeline_name,
                self.name,
                "output",
            )

        output_config = {"S3OutputPath": output_staged_folder_s3_uri}
        if self.s3_kms_key is not None:
            output_config["KmsKeyId"] = self.s3_kms_key
        request_dict["OutputDataConfig"] = output_config

        # instance config
        resource_config = dict(
            VolumeSizeInGB=self.volume_size,
            InstanceCount=1,
            InstanceType=self.instance_type,
        )
        if self.volume_kms_key is not None:
            resource_config["VolumeKmsKeyId"] = self.volume_kms_key
        request_dict["ResourceConfig"] = resource_config

        # network
        if self.vpc_config:
            request_dict["VpcConfig"] = self.vpc_config

        # tags
        request_dict["Tags"] = self._prepare_tags()

        # env variables
        request_dict["Environment"] = self._prepare_env_variables()

        # notebook job parameter
        if self.parameters:
            request_dict["HyperParameters"] = self.parameters

        return request_dict