in pipelines/customerchurn/pipeline.py [0:0]
def get_pipeline(
region,
role=None,
default_bucket=None,
model_package_group_name="ChurnModelPackageGroup",
pipeline_name="ChurnModelPipeline",
base_prefix = None,
custom_image_uri = None,
sklearn_processor_version=None
):
"""Gets a SageMaker ML Pipeline instance working with churn data.
Args:
region: AWS region to create and run the pipeline.
role: IAM role to create and run steps and pipeline.
default_bucket: the bucket to use for storing the artifacts
Returns:
an instance of a pipeline
"""
sagemaker_session = get_session(region, default_bucket)
processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount",
default_value=1
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge"
)
input_data = ParameterString(
name="InputData",
default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
)
batch_data = ParameterString(
name="BatchData",
default_value="s3://{}/data/batch/batch.csv".format(default_bucket),
)
# processing step for feature engineering
sklearn_processor = SKLearnProcessor(
framework_version=sklearn_processor_version,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
sagemaker_session=sagemaker_session,
role=role,
)
step_process = ProcessingStep(
name="ChurnModelProcess",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train",\
destination=f"s3://{default_bucket}/output/train" ),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",\
destination=f"s3://{default_bucket}/output/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test",\
destination=f"s3://{default_bucket}/output/test")
],
code=f"s3://{default_bucket}/input/code/preprocess.py",
)
# training step for generating model artifacts
model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type=training_instance_type,
)
fixed_hyperparameters = {
"eval_metric":"auc",
"objective":"binary:logistic",
"num_round":"100",
"rate_drop":"0.3",
"tweedie_variance_power":"1.4"
}
xgb_train = Estimator(
image_uri=image_uri,
instance_type=training_instance_type,
instance_count=1,
hyperparameters=fixed_hyperparameters,
output_path=model_path,
base_job_name=f"churn-train",
sagemaker_session=sagemaker_session,
role=role,
)
hyperparameter_ranges = {
"eta": ContinuousParameter(0, 1),
"min_child_weight": ContinuousParameter(1, 10),
"alpha": ContinuousParameter(0, 2),
"max_depth": IntegerParameter(1, 10),
}
objective_metric_name = "validation:auc"
step_tuning = TuningStep(
name = "ChurnHyperParameterTuning",
tuner = HyperparameterTuner(xgb_train, objective_metric_name, hyperparameter_ranges, max_jobs=2, max_parallel_jobs=2),
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv",
),
},
)
# processing step for evaluation
script_eval = ScriptProcessor(
image_uri=image_uri,
command=["python3"],
instance_type=processing_instance_type,
instance_count=1,
base_job_name="script-churn-eval",
role=role,
sagemaker_session=sagemaker_session,
)
evaluation_report = PropertyFile(
name="ChurnEvaluationReport",
output_name="evaluation",
path="evaluation.json",
)
step_eval = ProcessingStep(
name="ChurnEvalBestModel",
processor=script_eval,
inputs=[
ProcessingInput(
source=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
destination="/opt/ml/processing/model"
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs[
"test"
].S3Output.S3Uri,
destination="/opt/ml/processing/test"
)
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",\
destination=f"s3://{default_bucket}/output/evaluation"),
],
code=f"s3://{default_bucket}/input/code/evaluate.py",
property_files=[evaluation_report],
)
# step to create model
model = Model(
image_uri=image_uri,
model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
sagemaker_session=sagemaker_session,
role=role,
)
inputs = CreateModelInput(
instance_type="ml.m5.large",
accelerator_type="ml.eia1.medium",
)
step_create_model = CreateModelStep(
name="ChurnCreateModel",
model=model,
inputs=inputs,
)
# step to perform batch transformation
transformer = Transformer(
model_name=step_create_model.properties.ModelName,
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=f"s3://{default_bucket}/ChurnTransform"
)
step_transform = TransformStep(
name="ChurnTransform",
transformer=transformer,
inputs=TransformInput(data=batch_data,content_type="text/csv")
)
# register model step that will be conditionally executed
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri="s3://{}/evaluation.json".format(default_bucket),
content_type="application/json"
)
)
step_register = RegisterModel(
name="RegisterChurnModel",
estimator=xgb_train,
model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.large"],
transform_instances=["ml.m5.large"],
model_package_group_name=model_package_group_name,
model_metrics=model_metrics,
)
# Processing Step to generate analysis config file for Clarify Step
bias_report_output_path = f"s3://{default_bucket}/clarify-output/bias"
clarify_instance_type = 'ml.c5.xlarge'
analysis_config_path = f"s3://{default_bucket}/clarify-output/bias/analysis_config.json"
script_processor = ScriptProcessor(
command=['python3'],
image_uri=custom_image_uri,
role=role,
instance_count=1,
instance_type=processing_instance_type,
sagemaker_session=sagemaker_session,
)
step_config_file = ProcessingStep(
name="ChurnModelConfigFile",
processor=script_processor,
code=f"s3://{default_bucket}/input/code/generate_config.py",
job_arguments=["--modelname",step_create_model.properties.ModelName,"--bias-report-output-path",bias_report_output_path,"--clarify-instance-type",clarify_instance_type,\
"--default-bucket",default_bucket,"--num-baseline-samples","50","--instance-count","1"],
depends_on= [step_create_model.name]
)
# clarify step
data_config = sagemaker.clarify.DataConfig(
s3_data_input_path=f's3://{default_bucket}/output/train/train.csv',
s3_output_path=bias_report_output_path,
label=0,
headers= ['target','esent','eopenrate','eclickrate','avgorder','ordfreq','paperless','refill','doorstep','first_last_days_diff','created_first_days_diff','favday_Friday','favday_Monday','favday_Saturday','favday_Sunday','favday_Thursday','favday_Tuesday','favday_Wednesday','city_BLR','city_BOM','city_DEL','city_MAA'],
dataset_type="text/csv",
)
clarify_processor = sagemaker.clarify.SageMakerClarifyProcessor(
role=role,
instance_count=1,
instance_type=clarify_instance_type,
sagemaker_session=sagemaker_session,
)
config_input = ProcessingInput(
input_name="analysis_config",
source=analysis_config_path,
destination="/opt/ml/processing/input/analysis_config",
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_compression_type="None",
)
data_input = ProcessingInput(
input_name="dataset",
source=data_config.s3_data_input_path,
destination="/opt/ml/processing/input/data",
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type=data_config.s3_data_distribution_type,
s3_compression_type=data_config.s3_compression_type,
)
result_output = ProcessingOutput(
source="/opt/ml/processing/output",
destination=data_config.s3_output_path,
output_name="analysis_result",
s3_upload_mode="EndOfJob",
)
step_clarify = ProcessingStep(
name="ClarifyProcessingStep",
processor=clarify_processor,
inputs= [data_input, config_input],
outputs=[result_output],
depends_on = [step_config_file.name]
)
# condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThan(
left=JsonGet(
step=step_eval,
property_file=evaluation_report,
json_path="classification_metrics.auc_score.value"
),
right=0.75,
)
step_cond = ConditionStep(
name="CheckAUCScoreChurnEvaluation",
conditions=[cond_lte],
if_steps=[step_register, step_create_model, step_config_file,step_transform,step_clarify],
else_steps=[],
)
# pipeline instance
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
input_data,
batch_data,
],
steps=[step_process,step_tuning,step_eval,step_cond],
sagemaker_session=sagemaker_session,
)
return pipeline