in src/sagemaker/processing.py [0:0]
def _normalize_inputs(self, inputs=None, kms_key=None):
"""Ensures that all the ``ProcessingInput`` objects have names and S3 URIs.
Args:
inputs (list[sagemaker.processing.ProcessingInput]): A list of ``ProcessingInput``
objects to be normalized (default: None). If not specified,
an empty list is returned.
kms_key (str): The ARN of the KMS key that is used to encrypt the
user code file (default: None).
Returns:
list[sagemaker.processing.ProcessingInput]: The list of normalized
``ProcessingInput`` objects.
Raises:
TypeError: if the inputs are not ``ProcessingInput`` objects.
"""
# Initialize a list of normalized ProcessingInput objects.
normalized_inputs = []
if inputs is not None:
# Iterate through the provided list of inputs.
for count, file_input in enumerate(inputs, 1):
if not isinstance(file_input, ProcessingInput):
raise TypeError("Your inputs must be provided as ProcessingInput objects.")
# Generate a name for the ProcessingInput if it doesn't have one.
if file_input.input_name is None:
file_input.input_name = "input-{}".format(count)
if isinstance(file_input.source, Properties) or file_input.dataset_definition:
normalized_inputs.append(file_input)
continue
if isinstance(file_input.s3_input.s3_uri, (Parameter, Expression, Properties)):
normalized_inputs.append(file_input)
continue
# If the source is a local path, upload it to S3
# and save the S3 uri in the ProcessingInput source.
parse_result = urlparse(file_input.s3_input.s3_uri)
if parse_result.scheme != "s3":
desired_s3_uri = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
self._current_job_name,
"input",
file_input.input_name,
)
s3_uri = s3.S3Uploader.upload(
local_path=file_input.s3_input.s3_uri,
desired_s3_uri=desired_s3_uri,
sagemaker_session=self.sagemaker_session,
kms_key=kms_key,
)
file_input.s3_input.s3_uri = s3_uri
normalized_inputs.append(file_input)
return normalized_inputs