in src/stepfunctions/template/pipeline/train.py [0:0]
def execute(self, job_name=None, hyperparameters=None):
"""
Run the training pipeline.
Args:
job_name (str, optional): Name for the training job. If one is not provided, a job name will be auto-generated. (default: None)
hyperparameters (dict, optional): Hyperparameters for the estimator training. (default: None)
Returns:
:py:class:`~stepfunctions.workflow.Execution`: Running instance of the training pipeline.
"""
inputs = self.input_template.copy()
if hyperparameters is not None:
inputs[StepId.Train.value]['HyperParameters'] = {
k: str(v) for k, v in hyperparameters.items()
}
if job_name is None:
job_name = '{base_name}-{timestamp}'.format(base_name='training-pipeline', timestamp=self._generate_timestamp())
# Configure training and model
inputs[StepId.Train.value]['TrainingJobName'] = 'estimator-' + job_name
inputs[StepId.Train.value]['OutputDataConfig']['S3OutputPath'] = 's3://{s3_bucket}/{pipeline_name}/models'.format(
s3_bucket=self.s3_bucket,
pipeline_name=self.workflow.name
)
inputs[StepId.CreateModel.value]['ModelName'] = job_name
# Configure endpoint
inputs[StepId.ConfigureEndpoint.value]['EndpointConfigName'] = job_name
for variant in inputs[StepId.ConfigureEndpoint.value]['ProductionVariants']:
variant['ModelName'] = job_name
inputs[StepId.Deploy.value]['EndpointConfigName'] = job_name
inputs[StepId.Deploy.value]['EndpointName'] = job_name
# Configure the path to model artifact
inputs[StepId.CreateModel.value]['PrimaryContainer']['ModelDataUrl'] = '{s3_uri}/{job}/output/model.tar.gz'.format(
s3_uri=inputs[StepId.Train.value]['OutputDataConfig']['S3OutputPath'],
job=inputs[StepId.Train.value]['TrainingJobName']
)
return self.workflow.execute(inputs=inputs, name=job_name)