in src/stepfunctions/template/pipeline/inference.py [0:0]
def build_workflow_definition(self):
"""
Build the workflow definition for the inference pipeline with all the states involved.
Returns:
:class:`~stepfunctions.steps.states.Chain`: Workflow definition as a chain of states involved in the the inference pipeline.
"""
default_name = self.pipeline_name
instance_type = self.preprocessor.instance_type
instance_count = self.preprocessor.instance_count
# Preprocessor for feature transformation
preprocessor_train_step = TrainingStep(
StepId.TrainPreprocessor.value,
estimator=self.preprocessor,
job_name=default_name + '/preprocessor-source',
data=self.inputs,
)
preprocessor_model = self.preprocessor.create_model()
preprocessor_model_step = ModelStep(
StepId.CreatePreprocessorModel.value,
instance_type=instance_type,
model=preprocessor_model,
model_name=default_name
)
preprocessor_transform_step = TransformStep(
StepId.TransformInput.value,
transformer=self.preprocessor.transformer(instance_count=instance_count, instance_type=instance_type, max_payload=20),
job_name=default_name,
model_name=default_name,
data=self.inputs['train'],
compression_type=self.compression_type,
content_type=self.content_type
)
# Training
instance_type = self.estimator.instance_type
instance_count = self.estimator.instance_count
training_step = TrainingStep(
StepId.Train.value,
estimator=self.estimator,
job_name=default_name + '/estimator-source',
data=self.inputs,
)
pipeline_model = PipelineModel(
name='PipelineModel',
role=self.estimator.role,
models=[
self.preprocessor.create_model(),
self.estimator.create_model()
]
)
pipeline_model_step = ModelStep(
StepId.CreatePipelineModel.value,
instance_type=instance_type,
model=preprocessor_model,
model_name=default_name
)
pipeline_model_step.parameters = self.pipeline_model_config(instance_type, pipeline_model)
deployable_model = Model(model_data='', image_uri='')
# Deployment
endpoint_config_step = EndpointConfigStep(
StepId.ConfigureEndpoint.value,
endpoint_config_name=default_name,
model_name=default_name,
initial_instance_count=instance_count,
instance_type=instance_type
)
deploy_step = EndpointStep(
StepId.Deploy.value,
endpoint_name=default_name,
endpoint_config_name=default_name,
)
return Chain([
preprocessor_train_step,
preprocessor_model_step,
preprocessor_transform_step,
training_step,
pipeline_model_step,
endpoint_config_step,
deploy_step
])