in src/stepfunctions/template/pipeline/inference.py [0:0]
def execute(self, job_name=None, hyperparameters=None):
"""
Run the inference pipeline.
Args:
job_name (str, optional): Name for the training job. This is also used as suffix for the preprocessing job as `preprocess-<job_name>`. If one is not provided, a job name will be auto-generated. (default: None)
hyperparameters (dict, optional): Hyperparameters for the estimator training. (default: None)
Returns:
:py:class:`~stepfunctions.workflow.Execution`: Running instance of the inference pipeline.
"""
inputs = self.input_template.copy()
if hyperparameters is not None:
inputs[StepId.Train.value]['HyperParameters'] = hyperparameters
if job_name is None:
job_name = '{base_name}-{timestamp}'.format(base_name='inference-pipeline', timestamp=self._generate_timestamp())
# Configure preprocessor
inputs[StepId.TrainPreprocessor.value]['TrainingJobName'] = 'preprocessor-' + job_name
inputs[StepId.TrainPreprocessor.value]['OutputDataConfig']['S3OutputPath'] = 's3://{s3_bucket}/{pipeline_name}/models'.format(
s3_bucket=self.s3_bucket,
pipeline_name=self.workflow.name
)
inputs[StepId.TrainPreprocessor.value]['DebugHookConfig']['S3OutputPath'] = 's3://{s3_bucket}/{pipeline_name}/models/debug'.format(
s3_bucket=self.s3_bucket,
pipeline_name=self.workflow.name
)
inputs[StepId.CreatePreprocessorModel.value]['PrimaryContainer']['ModelDataUrl'] = '{s3_uri}/{job}/output/model.tar.gz'.format(
s3_uri=inputs[StepId.TrainPreprocessor.value]['OutputDataConfig']['S3OutputPath'],
job=inputs[StepId.TrainPreprocessor.value]['TrainingJobName']
)
inputs[StepId.CreatePreprocessorModel.value]['ModelName'] = inputs[StepId.TrainPreprocessor.value]['TrainingJobName']
inputs[StepId.TransformInput.value]['ModelName'] = inputs[StepId.CreatePreprocessorModel.value]['ModelName']
inputs[StepId.TransformInput.value]['TransformJobName'] = inputs[StepId.CreatePreprocessorModel.value]['ModelName']
inputs[StepId.TransformInput.value]['TransformOutput']['S3OutputPath'] = 's3://{s3_bucket}/{pipeline_name}/{transform_job}/transform'.format(
s3_bucket=self.s3_bucket,
pipeline_name=self.workflow.name,
transform_job='preprocessor-transform-' + job_name
)
self.replace_sagemaker_job_name(inputs[StepId.TrainPreprocessor.value], inputs[StepId.TrainPreprocessor.value]['TrainingJobName'])
# Configure training and model
inputs[StepId.Train.value]['TrainingJobName'] = 'estimator-' + job_name
inputs[StepId.Train.value]['InputDataConfig'] = [{
'ChannelName': 'train',
'DataSource': {
'S3DataSource': {
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3Uri': '{s3_uri}'.format(
s3_uri=inputs[StepId.TransformInput.value]['TransformOutput']['S3OutputPath']
)
}
}
}]
inputs[StepId.Train.value]['OutputDataConfig']['S3OutputPath'] = 's3://{s3_bucket}/{pipeline_name}/models'.format(
s3_bucket=self.s3_bucket,
pipeline_name=self.workflow.name
)
inputs[StepId.Train.value]['DebugHookConfig']['S3OutputPath'] = 's3://{s3_bucket}/{pipeline_name}/models/debug'.format(
s3_bucket=self.s3_bucket,
pipeline_name=self.workflow.name
)
inputs[StepId.CreatePipelineModel.value]['ModelName'] = job_name
self.replace_sagemaker_job_name(inputs[StepId.Train.value], inputs[StepId.Train.value]['TrainingJobName'])
# Configure pipeline model
inputs[StepId.CreatePipelineModel.value]['Containers'][0]['ModelDataUrl'] = inputs[StepId.CreatePreprocessorModel.value]['PrimaryContainer']['ModelDataUrl']
inputs[StepId.CreatePipelineModel.value]['Containers'][1]['ModelDataUrl'] = '{s3_uri}/{job}/output/model.tar.gz'.format(
s3_uri=inputs[StepId.Train.value]['OutputDataConfig']['S3OutputPath'],
job=inputs[StepId.Train.value]['TrainingJobName']
)
# Configure endpoint
inputs[StepId.ConfigureEndpoint.value]['EndpointConfigName'] = job_name
for variant in inputs[StepId.ConfigureEndpoint.value]['ProductionVariants']:
variant['ModelName'] = job_name
inputs[StepId.Deploy.value]['EndpointConfigName'] = job_name
inputs[StepId.Deploy.value]['EndpointName'] = job_name
return self.workflow.execute(inputs=inputs, name=job_name)