def get_pipeline()

in pipelines/customerchurn/pipeline.py [0:0]


def get_pipeline(
    region,
    role=None,
    default_bucket=None,
    model_package_group_name="ChurnModelPackageGroup",
    pipeline_name="ChurnModelPipeline",
    base_prefix = None,
    custom_image_uri = None,
    sklearn_processor_version=None
    ):
    """Gets a SageMaker ML Pipeline instance working with churn data.
    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts
    Returns:
        an instance of a pipeline
    """
    sagemaker_session = get_session(region, default_bucket)
    processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
    )
    processing_instance_type = ParameterString(
        name="ProcessingInstanceType",
        default_value="ml.m5.xlarge"
    )
    training_instance_type = ParameterString(
        name="TrainingInstanceType",
        default_value="ml.m5.xlarge"
    )
    input_data = ParameterString(
        name="InputData",
        default_value="s3://{}/data/storedata_total.csv".format(default_bucket),
    )
    batch_data = ParameterString(
        name="BatchData",
         default_value="s3://{}/data/batch/batch.csv".format(default_bucket),
    )
    
    # processing step for feature engineering
    sklearn_processor = SKLearnProcessor(
        framework_version=sklearn_processor_version,
        instance_type=processing_instance_type,
        instance_count=processing_instance_count,
        sagemaker_session=sagemaker_session,
        role=role,
    )
    step_process = ProcessingStep(
        name="ChurnModelProcess",
        processor=sklearn_processor,
        inputs=[
          ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
        ],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train",\
                             destination=f"s3://{default_bucket}/output/train" ),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",\
                            destination=f"s3://{default_bucket}/output/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test",\
                            destination=f"s3://{default_bucket}/output/test")
        ],
        code=f"s3://{default_bucket}/input/code/preprocess.py",
    )
    
    # training step for generating model artifacts
    model_path = f"s3://{default_bucket}/output"
    image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.0-1",
        py_version="py3",
        instance_type=training_instance_type,
    )
    fixed_hyperparameters = {
    "eval_metric":"auc",
    "objective":"binary:logistic",
    "num_round":"100",
    "rate_drop":"0.3",
    "tweedie_variance_power":"1.4"
    }
    xgb_train = Estimator(
        image_uri=image_uri,
        instance_type=training_instance_type,
        instance_count=1,
        hyperparameters=fixed_hyperparameters,
        output_path=model_path,
        base_job_name=f"churn-train",
        sagemaker_session=sagemaker_session,
        role=role,
    )
    hyperparameter_ranges = {
    "eta": ContinuousParameter(0, 1),
    "min_child_weight": ContinuousParameter(1, 10),
    "alpha": ContinuousParameter(0, 2),
    "max_depth": IntegerParameter(1, 10),
    }
    objective_metric_name = "validation:auc"
    
    step_tuning = TuningStep(
    name = "ChurnHyperParameterTuning",
    tuner = HyperparameterTuner(xgb_train, objective_metric_name, hyperparameter_ranges, max_jobs=2, max_parallel_jobs=2),
    inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
    )
    
    # processing step for evaluation
    script_eval = ScriptProcessor(
        image_uri=image_uri,
        command=["python3"],
        instance_type=processing_instance_type,
        instance_count=1,
        base_job_name="script-churn-eval",
        role=role,
        sagemaker_session=sagemaker_session,
    )
    evaluation_report = PropertyFile(
        name="ChurnEvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )
    step_eval = ProcessingStep(
        name="ChurnEvalBestModel",
        processor=script_eval,
        inputs=[
            ProcessingInput(
                source=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
                destination="/opt/ml/processing/model"
            ),
            ProcessingInput(
                source=step_process.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test"
            )
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",\
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
        code=f"s3://{default_bucket}/input/code/evaluate.py",
        property_files=[evaluation_report],
    )
    
    # step to create model 
    model = Model(
        image_uri=image_uri,        
        model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
        sagemaker_session=sagemaker_session,
        role=role,
    )
    inputs = CreateModelInput(
        instance_type="ml.m5.large",
        accelerator_type="ml.eia1.medium",
    )
    step_create_model = CreateModelStep(
        name="ChurnCreateModel",
        model=model,
        inputs=inputs,
    )
    
    # step to perform batch transformation
    transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform"
    )
    step_transform = TransformStep(
    name="ChurnTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data,content_type="text/csv")
    )
    
    # register model step that will be conditionally executed
    model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="s3://{}/evaluation.json".format(default_bucket),
        content_type="application/json"
        )
    )
    step_register = RegisterModel(
        name="RegisterChurnModel",
        estimator=xgb_train,
        model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.t2.medium", "ml.m5.large"],
        transform_instances=["ml.m5.large"],
        model_package_group_name=model_package_group_name,
        model_metrics=model_metrics,
    )
    
    # Processing Step to generate analysis config file for Clarify Step
    bias_report_output_path = f"s3://{default_bucket}/clarify-output/bias"
    clarify_instance_type = 'ml.c5.xlarge'
    analysis_config_path = f"s3://{default_bucket}/clarify-output/bias/analysis_config.json"
    script_processor = ScriptProcessor(
        command=['python3'],
        image_uri=custom_image_uri,
        role=role,
        instance_count=1,
        instance_type=processing_instance_type,
        sagemaker_session=sagemaker_session,
    )
    step_config_file = ProcessingStep(
        name="ChurnModelConfigFile",
        processor=script_processor,
        code=f"s3://{default_bucket}/input/code/generate_config.py",
        job_arguments=["--modelname",step_create_model.properties.ModelName,"--bias-report-output-path",bias_report_output_path,"--clarify-instance-type",clarify_instance_type,\
                      "--default-bucket",default_bucket,"--num-baseline-samples","50","--instance-count","1"],
        depends_on= [step_create_model.name]
    )
    
    # clarify step
    data_config = sagemaker.clarify.DataConfig(
    s3_data_input_path=f's3://{default_bucket}/output/train/train.csv',
    s3_output_path=bias_report_output_path,
        label=0,
        headers= ['target','esent','eopenrate','eclickrate','avgorder','ordfreq','paperless','refill','doorstep','first_last_days_diff','created_first_days_diff','favday_Friday','favday_Monday','favday_Saturday','favday_Sunday','favday_Thursday','favday_Tuesday','favday_Wednesday','city_BLR','city_BOM','city_DEL','city_MAA'],
        dataset_type="text/csv",
    )
    clarify_processor = sagemaker.clarify.SageMakerClarifyProcessor(
        role=role,
        instance_count=1,
        instance_type=clarify_instance_type,
        sagemaker_session=sagemaker_session,
    )
    config_input = ProcessingInput(
        input_name="analysis_config",
        source=analysis_config_path,
        destination="/opt/ml/processing/input/analysis_config",
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_compression_type="None",
        )
    data_input = ProcessingInput(
        input_name="dataset",
        source=data_config.s3_data_input_path,
        destination="/opt/ml/processing/input/data",
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type=data_config.s3_data_distribution_type,
        s3_compression_type=data_config.s3_compression_type,
    )
    result_output = ProcessingOutput(
        source="/opt/ml/processing/output",
        destination=data_config.s3_output_path,
        output_name="analysis_result",
        s3_upload_mode="EndOfJob",
    )
    step_clarify = ProcessingStep(
        name="ClarifyProcessingStep",
        processor=clarify_processor,
        inputs= [data_input, config_input],
        outputs=[result_output],
        depends_on = [step_config_file.name]
    )
    
    # condition step for evaluating model quality and branching execution
    cond_lte = ConditionGreaterThan(
        left=JsonGet(
            step=step_eval,
            property_file=evaluation_report,
            json_path="classification_metrics.auc_score.value"
        ),
        right=0.75,
    )
    step_cond = ConditionStep(
        name="CheckAUCScoreChurnEvaluation",
        conditions=[cond_lte],
        if_steps=[step_register, step_create_model, step_config_file,step_transform,step_clarify],
        else_steps=[],
    )
    # pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            processing_instance_type,
            processing_instance_count,
            training_instance_type,
            input_data,
            batch_data,
        ],
        steps=[step_process,step_tuning,step_eval,step_cond],
        sagemaker_session=sagemaker_session,
    )
    return pipeline