in src/sagemaker/local/entities.py [0:0]
def start(self, processing_inputs, processing_output_config, environment, processing_job_name):
"""Starts a local processing job.
Args:
processing_inputs: The processing input configuration.
processing_output_config: The processing input configuration.
environment: The collection of environment variables passed to the job.
processing_job_name: The processing job name.
"""
self.state = self._STARTING
for item in processing_inputs:
if "DatasetDefinition" in item:
raise RuntimeError("DatasetDefinition is not currently supported in Local Mode")
try:
s3_input = item["S3Input"]
except KeyError:
raise ValueError("Processing input must have a valid ['S3Input']")
item["DataUri"] = s3_input["S3Uri"]
if "S3InputMode" in s3_input and s3_input["S3InputMode"] != "File":
raise RuntimeError(
"S3InputMode: %s is not currently supported in Local Mode"
% s3_input["S3InputMode"]
)
if (
"S3DataDistributionType" in s3_input
and s3_input["S3DataDistributionType"] != "FullyReplicated"
):
raise RuntimeError(
"DataDistribution: %s is not currently supported in Local Mode"
% s3_input["S3DataDistributionType"]
)
if "S3CompressionType" in s3_input and s3_input["S3CompressionType"] != "None":
raise RuntimeError(
"CompressionType: %s is not currently supported in Local Mode"
% s3_input["S3CompressionType"]
)
if processing_output_config and "Outputs" in processing_output_config:
processing_outputs = processing_output_config["Outputs"]
for item in processing_outputs:
if "FeatureStoreOutput" in item:
raise RuntimeError(
"FeatureStoreOutput is not currently supported in Local Mode"
)
try:
s3_output = item["S3Output"]
except KeyError:
raise ValueError("Processing output must have a valid ['S3Output']")
if s3_output["S3UploadMode"] != "EndOfJob":
raise RuntimeError(
"UploadMode: %s is not currently supported in Local Mode."
% s3_output["S3UploadMode"]
)
self.start_time = datetime.datetime.now()
self.state = self._PROCESSING
self.processing_job_name = processing_job_name
self.processing_inputs = processing_inputs
self.processing_output_config = processing_output_config
self.environment = environment
self.container.process(
processing_inputs, processing_output_config, environment, processing_job_name
)
self.end_time = datetime.datetime.now()
self.state = self._COMPLETED