in src/sagemaker/workflow/notebook_job_step.py [0:0]
def arguments(self) -> RequestType:
"""Generates the arguments dictionary that is used to create the job."""
step_compilation_context = load_step_compilation_context()
self.sagemaker_session = step_compilation_context.sagemaker_session
self._region_from_session = self.sagemaker_session.boto_region_name
self._resolve_defaults()
# validate the inputs after resolving defaults
self._validate_inputs()
# generate training job name which is the key for the underlying training job
self._underlying_job_prefix = self._get_job_name_prefix(self.notebook_job_name)
pipeline_name = (
step_compilation_context.pipeline_name
if step_compilation_context
else self._underlying_job_prefix
)
# step 1 - prepare for the staged input and upload to s3
input_staged_folder_s3_uri = s3_path_join(
self.s3_root_uri,
pipeline_name,
self.name,
name_from_base("input"),
)
upload_list = [self.input_notebook]
if self.initialization_script:
upload_list.append(self.initialization_script)
if self.additional_dependencies:
upload_list = upload_list + self.additional_dependencies
self._upload_job_files(
s3_base_uri=input_staged_folder_s3_uri,
paths_to_upload=upload_list,
kms_key=self.s3_kms_key,
sagemaker_session=self.sagemaker_session,
)
# step 2 - compose the job request
request_dict = dict(
TrainingJobName=self._underlying_job_prefix,
RoleArn=self.role,
RetryStrategy={"MaximumRetryAttempts": self.max_retry_attempts},
StoppingCondition={
"MaxRuntimeInSeconds": self.max_runtime_in_seconds,
},
EnableInterContainerTrafficEncryption=self.encrypt_inter_container_traffic,
)
# training algorithm config
algorithm_spec = dict(
TrainingImage=self.image_uri,
TrainingInputMode="File",
ContainerEntrypoint=self._scheduler_container_entry_point,
)
if self._scheduler_container_arguments:
algorithm_spec["ContainerArguments"] = self._scheduler_container_arguments
request_dict["AlgorithmSpecification"] = algorithm_spec
# training input channel
input_data_config = [
{
"ChannelName": "sagemaker_headless_execution_pipelinestep",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": input_staged_folder_s3_uri,
"S3DataDistributionType": "FullyReplicated",
}
},
},
]
request_dict["InputDataConfig"] = input_data_config
# training output
if step_compilation_context:
output_staged_folder_s3_uri = Join(
"/",
[
self.s3_root_uri,
pipeline_name,
ExecutionVariables.PIPELINE_EXECUTION_ID,
self.name,
],
)
else:
output_staged_folder_s3_uri = s3_path_join(
self.s3_root_uri,
pipeline_name,
self.name,
"output",
)
output_config = {"S3OutputPath": output_staged_folder_s3_uri}
if self.s3_kms_key is not None:
output_config["KmsKeyId"] = self.s3_kms_key
request_dict["OutputDataConfig"] = output_config
# instance config
resource_config = dict(
VolumeSizeInGB=self.volume_size,
InstanceCount=1,
InstanceType=self.instance_type,
)
if self.volume_kms_key is not None:
resource_config["VolumeKmsKeyId"] = self.volume_kms_key
request_dict["ResourceConfig"] = resource_config
# network
if self.vpc_config:
request_dict["VpcConfig"] = self.vpc_config
# tags
request_dict["Tags"] = self._prepare_tags()
# env variables
request_dict["Environment"] = self._prepare_env_variables()
# notebook job parameter
if self.parameters:
request_dict["HyperParameters"] = self.parameters
return request_dict