in src/cloud/pipelines/semantic_segmentation/pipeline.py [0:0]
def get_pipeline(
region,
role=None,
default_bucket=None,
pipeline_name="defect-detection-semantic-segmentation-pipeline",
base_job_prefix="defect-detection-semantic-segmentation",
):
"""Gets a SageMaker ML Pipeline instance working with on DefectDetection data.
Args:
region: AWS region to create and run the pipeline.
role: IAM role to create and run steps and pipeline.
default_bucket: the bucket to use for storing the artifacts
Returns:
an instance of a pipeline
"""
sagemaker_session = get_session(region, default_bucket)
if role is None:
role = sagemaker.session.get_execution_role(sagemaker_session)
## By enabling cache, if you run this pipeline again, without changing the input
## parameters it will skip the training part and reuse the previous trained model
cache_config = CacheConfig(enable_caching=True, expire_after="30d")
ts = time.strftime('%Y-%m-%d-%H-%M-%S')
# Data prep
processing_instance_type = ParameterString( # instance type for data preparation
name="ProcessingInstanceType",
default_value="ml.m5.xlarge"
)
processing_instance_count = ParameterInteger( # number of instances used for data preparation
name="ProcessingInstanceCount",
default_value=1
)
# Training
training_instance_type = ParameterString( # instance type for training the model
name="TrainingInstanceType",
default_value="ml.c5.xlarge"
)
training_instance_count = ParameterInteger( # number of instances used to train your model
name="TrainingInstanceCount",
default_value=1
)
training_epochs = ParameterString(
name="TrainingEpochs",
default_value="100"
)
# Dataset input data: S3 path
input_data = ParameterString(
name="InputData",
default_value="",
)
# Model Approval State
model_approval_status = ParameterString(
name="ModelApprovalStatus",
default_value="PendingManualApproval"
)
# Model package group name for registering in model registry
model_package_group_name = ParameterString(
name="ModelPackageGroupName",
default_value="defect-detection-semantic-segmentation-model-group"
)
# The preprocessor
preprocessor = SKLearnProcessor(
framework_version="0.23-1",
role=role,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
max_runtime_in_seconds=7200,
)
# A preprocessing report to store some information from the preprocessing step for next steps
preprocessing_report = PropertyFile(
name='PreprocessingReport',
output_name='preprocessing_report',
path='preprocessing_report.json'
)
# Preprocessing Step
step_process = ProcessingStep(
name="DefectDetectionPreprocessing",
code=os.path.join(BASE_DIR, 'preprocessing.py'), ## this is the script defined above
processor=preprocessor,
inputs=[
ProcessingInput(source=input_data, destination='/opt/ml/processing/input')
],
outputs=[
ProcessingOutput(output_name='train_data', source='/opt/ml/processing/train'),
ProcessingOutput(output_name='test_data', source='/opt/ml/processing/test'),
ProcessingOutput(output_name='val_data', source='/opt/ml/processing/val'),
ProcessingOutput(output_name='preprocessing_report', source='/opt/ml/processing/report')
],
job_arguments=['--split', '0.1'],
property_files=[preprocessing_report]
)
from sagemaker.tensorflow import TensorFlow
model_dir = '/opt/ml/model'
hyperparameters = {'epochs': training_epochs, 'batch_size': 8, 'learning_rate': 0.0001}
estimator = TensorFlow(source_dir=BASE_DIR,
entry_point='train_tf.py',
model_dir=model_dir,
instance_type=training_instance_type,
#instance_type='local',
instance_count=training_instance_count,
hyperparameters=hyperparameters,
role=role,
output_path='s3://{}/{}/{}/{}'.format(default_bucket, 'models', base_job_prefix, 'training-output'),
framework_version='2.2.0',
py_version='py37',
script_mode=True
)
step_train = TrainingStep(
name="DefectDetectionSemanticSegmentationTrain",
estimator=estimator,
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
content_type='image/png',
s3_data_type='S3Prefix'
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs["val_data"].S3Output.S3Uri,
content_type='image/png',
s3_data_type='S3Prefix'
)
},
cache_config=cache_config
)
# Register model step that will be conditionally executed
step_register = RegisterModel(
name="DefectDetectionSemanticSegmentationRegister",
estimator=estimator,
model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["image/png"],
response_types=["application/json"],
inference_instances=["ml.c5.2xlarge", "ml.p3.2xlarge"],
transform_instances=["ml.c5.xlarge"],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status
)
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
training_instance_count,
training_epochs,
input_data,
model_approval_status,
model_package_group_name
],
steps=[step_process, step_train, step_register],
sagemaker_session=sagemaker_session,
)
return pipeline