in 08_projects/modelbuild/pipelines/endtoendmlsm/workflow.py [0:0]
def get_pipeline(region,
sagemaker_project_arn=None,
role=None,
default_bucket='',
pipeline_name='end-to-end-ml-sagemaker-pipeline',
model_package_group_name='end-to-end-ml-sm-model-package-group',
base_job_prefix='endtoendmlsm') -> Pipeline:
"""
Gets the SM Pipeline.
:param role: The execution role.
:param bucket_name: The bucket where pipeline artifacts are stored.
:param prefix: The prefix where pipeline artifacts are stored.
:return: A Pipeline instance.
"""
bucket_name = default_bucket
prefix = 'endtoendmlsm'
sagemaker_session = get_session(region, bucket_name)
# ---------------------
# Processing parameters
# ---------------------
# The path to the raw data.
raw_data_path = 's3://gianpo-public/endtoendml/data/raw/predmain_raw_data_header.csv'.format(bucket_name, prefix)
raw_data_path_param = ParameterString(name="raw_data_path", default_value=raw_data_path)
# The output path to the training data.
train_data_path = 's3://{0}/{1}/data/preprocessed/train/'.format(bucket_name, prefix)
train_data_path_param = ParameterString(name="train_data_path", default_value=train_data_path)
# The output path to the validation data.
val_data_path = 's3://{0}/{1}/data/preprocessed/val/'.format(bucket_name, prefix)
val_data_path_param = ParameterString(name="val_data_path", default_value=val_data_path)
# The output path to the featurizer model.
model_path = 's3://{0}/{1}/output/sklearn/'.format(bucket_name, prefix)
model_path_param = ParameterString(name="model_path", default_value=model_path)
# The instance type for the processing job.
processing_instance_type_param = ParameterString(name="processing_instance_type", default_value='ml.m5.large')
# The instance count for the processing job.
processing_instance_count_param = ParameterInteger(name="processing_instance_count", default_value=1)
# The train/test split ration parameter.
train_test_split_ratio_param = ParameterString(name="train_test_split_ratio", default_value='0.2')
# -------------------
# Training parameters
# -------------------
# XGB hyperparameters.
max_depth_param = ParameterString(name="max_depth", default_value='3')
eta_param = ParameterString(name="eta", default_value='0.1')
gamma_param = ParameterString(name="gamma", default_value='0')
min_child_weight_param = ParameterString(name="min_child_weight", default_value='1')
objective_param = ParameterString(name="objective", default_value='binary:logistic')
num_round_param = ParameterString(name="num_round", default_value='10')
eval_metric_param = ParameterString(name="eval_metric", default_value='auc')
# The instance type for the training job.
training_instance_type_param = ParameterString(name="training_instance_type", default_value='ml.m5.xlarge')
# The instance count for the training job.
training_instance_count_param = ParameterInteger(name="training_instance_count", default_value=1)
# The training output path for the model.
output_path = 's3://{0}/{1}/output/'.format(bucket_name, prefix)
output_path_param = ParameterString(name="output_path", default_value=output_path)
# --------------------------
# Register model parameters
# --------------------------
# The default instance type for deployment.
deploy_instance_type_param = ParameterString(name="deploy_instance_type", default_value='ml.m5.2xlarge')
# The approval status for models added to the registry.
model_approval_status_param = ParameterString(name="model_approval_status", default_value='PendingManualApproval')
# --------------------------
# Processing Step
# --------------------------
sklearn_processor = SKLearnProcessor(role=role,
instance_type=processing_instance_type_param,
instance_count=processing_instance_count_param,
framework_version='0.20.0')
inputs = [ProcessingInput(input_name='raw_data',
source=raw_data_path_param, destination='/opt/ml/processing/input')]
outputs = [ProcessingOutput(output_name='train_data',
source='/opt/ml/processing/train', destination=train_data_path_param),
ProcessingOutput(output_name='val_data',
source='/opt/ml/processing/val', destination=val_data_path_param),
ProcessingOutput(output_name='model',
source='/opt/ml/processing/model', destination=model_path_param)]
code_path = os.path.join(BASE_DIR, 'dataprep/preprocess.py')
processing_step = ProcessingStep(
name='Processing',
code=code_path,
processor=sklearn_processor,
inputs=inputs,
outputs=outputs,
job_arguments=['--train-test-split-ratio', train_test_split_ratio_param]
)
# --------------------------
# Training Step
# --------------------------
hyperparameters = {
"max_depth": max_depth_param,
"eta": eta_param,
"gamma": gamma_param,
"min_child_weight": min_child_weight_param,
"silent": 0,
"objective": objective_param,
"num_round": num_round_param,
"eval_metric": eval_metric_param
}
entry_point = 'train.py'
source_dir = os.path.join(BASE_DIR, 'train/')
code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)
estimator = XGBoost(
entry_point=entry_point,
source_dir=source_dir,
output_path=output_path_param,
code_location=code_location,
hyperparameters=hyperparameters,
instance_type=training_instance_type_param,
instance_count=training_instance_count_param,
framework_version="0.90-2",
py_version="py3",
role=role
)
training_step = TrainingStep(
name='Training',
estimator=estimator,
inputs={
'train': TrainingInput(
s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
'train_data'
].S3Output.S3Uri,
content_type='text/csv'
),
'validation': TrainingInput(
s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
'val_data'
].S3Output.S3Uri,
content_type='text/csv'
)
}
)
# --------------------------
# Register Model Step
# --------------------------
code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)
sklearn_model = SKLearnModel(name='end-to-end-ml-sm-skl-model-{0}'.format(str(int(time.time()))),
model_data=processing_step.properties.ProcessingOutputConfig.Outputs[
'model'].S3Output.S3Uri,
entry_point='inference.py',
source_dir=os.path.join(BASE_DIR, 'deploy/sklearn/'),
code_location=code_location,
role=role,
sagemaker_session=sagemaker_session,
framework_version='0.20.0',
py_version='py3')
code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)
xgboost_model = XGBoostModel(name='end-to-end-ml-sm-xgb-model-{0}'.format(str(int(time.time()))),
model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
entry_point='inference.py',
source_dir=os.path.join(BASE_DIR, 'deploy/xgboost/'),
code_location=code_location,
framework_version='0.90-2',
py_version='py3',
role=role,
sagemaker_session=sagemaker_session)
pipeline_model_name = 'end-to-end-ml-sm-xgb-skl-pipeline-{0}'.format(str(int(time.time())))
pipeline_model = PipelineModel(
name=pipeline_model_name,
role=role,
models=[
sklearn_model,
xgboost_model],
sagemaker_session=sagemaker_session)
register_model_step = RegisterModel(
name='RegisterModel',
content_types=['text/csv'],
response_types=['application/json', 'text/csv'],
inference_instances=[deploy_instance_type_param, 'ml.m5.large'],
transform_instances=['ml.c5.4xlarge'],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status_param,
model=pipeline_model
)
# --------------------------
# Pipeline
# --------------------------
pipeline = Pipeline(
name=pipeline_name,
parameters=[
raw_data_path_param,
train_data_path_param,
val_data_path_param,
model_path_param,
processing_instance_type_param,
processing_instance_count_param,
train_test_split_ratio_param,
max_depth_param,
eta_param,
gamma_param,
min_child_weight_param,
objective_param,
num_round_param,
eval_metric_param,
training_instance_type_param,
training_instance_count_param,
output_path_param,
deploy_instance_type_param,
model_approval_status_param
],
steps=[processing_step, training_step, register_model_step],
sagemaker_session=sagemaker_session,
)
response = pipeline.upsert(role_arn=role)
print(response["PipelineArn"])
return pipeline