## Get started with SageMaker
In this notebook you'll learn how SageMaker can be used to:

1. Preprocess (and optionally explore) a dataset
2. Train an XGBoost classifier for customer churn prediction, using a managed job with SageMaker Training, using a managed image.
3. Perform hyperparameter tuning to find optimal set of hyperparameters, using a managed job with SageMaker HyperParameter Tuning
5. Perform batch inference using a managed SageMaker Batch Transform job.
7. Create a managed real-time SageMaker endpoint.

All SageMaker resources are created using the SageMaker Core SDK. You can find more information about sagemaker-core [here](https://sagemaker-core.readthedocs.io/en/latest/)

In [None]:
%pip install --upgrade pip -q
%pip install sagemaker-core -q

In [None]:
import time
from sagemaker_core.helper.session_helper import Session, get_execution_role

# Set up region, role and bucket parameters used throughout the notebook.
sagemaker_session = Session()
region = sagemaker_session.boto_region_name
role = get_execution_role()
bucket = sagemaker_session.default_bucket()

print(f"AWS region: {region}")
print(f"Execution role: {role}")
print(f"Default S3 bucket: {bucket}")

## Preprocess dataset
We'll use a synthetic dataset that AWS provides for customer churn prediction.


<div class="alert alert-block alert-info">
<b>NOTE:</b> This sample doesn't perform any exploratory data anlysis since how to preprocess the dataset is already known.
    
If you're interested in how to perform exploratory analysis, there's a section in the documentation for the sagemaker-python-sdk available that explores the dataset, [here](https://sagemaker-examples.readthedocs.io/en/latest/introduction_to_applying_machine_learning/xgboost_customer_churn/xgboost_customer_churn.html).
</div>

#### Read the data from S3

In [None]:
from io import StringIO
import pandas as pd

data = sagemaker_session.read_s3_file(
    f"sagemaker-example-files-prod-{region}",
    "datasets/tabular/synthetic/churn.txt"
)

df = pd.read_csv(StringIO(data))
df

#### Apply processing

In [None]:
from sklearn.model_selection import train_test_split

# Phone number is unique - will not add value to classifier
df = df.drop("Phone", axis=1)

# Cast Area Code to non-numeric
df["Area Code"] = df["Area Code"].astype(object)

# Remove one feature from highly corelated pairs
df = df.drop(["Day Charge", "Eve Charge", "Night Charge", "Intl Charge"], axis=1)

# One-hot encode catagorical features into numeric features
model_data = pd.get_dummies(df) 
model_data = pd.concat(
    [model_data["Churn?_True."], model_data.drop(["Churn?_False.", "Churn?_True."], axis=1)], axis=1
)
model_data = model_data.astype(float)

# Split data into train and validation datasets
train_data, validation_data = train_test_split(
    model_data, test_size=0.33, random_state=42)

# Further split the validation dataset into test and validation datasets.
validation_data, test_data = train_test_split(
    validation_data, test_size=0.33, random_state=42)

# Remove and store the target column for the test data. This is used for calculating performance metrics after training, on unseen data.
test_target_column = test_data['Churn?_True.']
test_data.drop(['Churn?_True.'], axis=1, inplace=True)

# Store all datasets locally
train_data.to_csv("train.csv", header=False, index=False)
validation_data.to_csv("validation.csv", header=False, index=False)
test_data.to_csv("test.csv", header=False, index=False)

# Upload each dataset to S3
s3_train_input = sagemaker_session.upload_data('train.csv', bucket)
s3_validation_input = sagemaker_session.upload_data('validation.csv', bucket)
s3_test_input = sagemaker_session.upload_data('test.csv', bucket)

print('Datasets uploaded to:')
print(s3_train_input)
print(s3_validation_input)
print(s3_test_input)

## Train a classifier using XGBoost
Use SageMaker Training and the managed XGBoost image to train a classifier. <br />
More details on how to use SageMaker managed training with XGBoost can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html).

<div class="alert alert-block alert-info">
  <b>NOTE:</b> For more information on using SageMaker managed container images and retrieving their ECR paths, 
  <a href="https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/sagemaker-algo-docker-registry-paths.html" target="_blank">here</a> 
  is the documentation. Please note that the image URI might need to be updated based on your selected AWS region.
</div>


In [None]:
image = '141502667606.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-xgboost:1.7-1'

In [None]:
from sagemaker_core.resources import TrainingJob
from sagemaker_core.shapes import AlgorithmSpecification, Channel, DataSource, S3DataSource, ResourceConfig, StoppingCondition, OutputDataConfig

job_name = 'xgboost-churn-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())  # Name of training job
instance_type = 'ml.m4.xlarge'  # SageMaker instance type to use for training
instance_count = 1  # Number of instances to use for training
volume_size_in_gb = 30  # Amount of storage to allocate to training job
max_runtime_in_seconds = 600  # Maximum runtimt. Job exits if it doesn't finish before this
s3_output_path = f"s3://{bucket}"  # bucket and optional prefix where the training job stores output artifacts, like model artifact.

# Specify hyperparameters
hyper_parameters = {
    "max_depth": "5",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "subsample": "0.8",
    "verbosity": "0",
    "objective": "binary:logistic",
    "num_round": "100",
}

# Create training job.
training_job = TrainingJob.create(
    training_job_name=job_name,
    hyper_parameters=hyper_parameters,
    algorithm_specification=AlgorithmSpecification(
        training_image=image,
        training_input_mode='File'
    ),
    role_arn=role,
    input_data_config=[
        Channel(
            channel_name='train',
            content_type='csv',
            data_source=DataSource(
                s3_data_source=S3DataSource(
                    s3_data_type='S3Prefix',
                    s3_uri=s3_train_input,
                    s3_data_distribution_type='FullyReplicated'
                )
            )
        ),
        Channel(
            channel_name='validation',
            content_type='csv',
            data_source=DataSource(
                s3_data_source=S3DataSource(
                    s3_data_type='S3Prefix',
                    s3_uri=s3_validation_input,
                    s3_data_distribution_type='FullyReplicated'
                )
            )
        )
    ],
    output_data_config=OutputDataConfig(
        s3_output_path=s3_output_path
    ),
    resource_config=ResourceConfig(
        instance_type=instance_type,
        instance_count=instance_count,
        volume_size_in_gb=volume_size_in_gb
    ),
    stopping_condition=StoppingCondition(
        max_runtime_in_seconds=max_runtime_in_seconds
    )
)

# Wait for the training job to complete
training_job.wait()

## Hyperparameter tuning
If the optimal hyperparameters aren't known, we perform a SageMaker Hyperparameter Tuning job, which runs several training jobs and iteratively finds the best set of parameters.

From a high level, a tuning job constists of 2 main components:
- `HyperParameterTrainingJobDefinition`, which specifies details for each individidual training job, like image to use, input channels, resource configuration etc.
- `HyperParameterTuningJobConfig`, which details the tuning configuration, like what strategy to use, how many jobs to run and what parameters to tune etc.

You can find more information about how it works [here](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-how-it-works.html).

In [None]:
from sagemaker_core.resources import HyperParameterTuningJob
from sagemaker_core.shapes import HyperParameterTuningJobConfig, \
     ResourceLimits, HyperParameterTuningJobWarmStartConfig, ParameterRanges, AutoParameter, \
     Autotune, HyperParameterTrainingJobDefinition, HyperParameterTuningJobObjective, HyperParameterAlgorithmSpecification, \
     OutputDataConfig, StoppingCondition, ResourceConfig

tuning_job_name = 'xgboost-tune-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())  # Name of tuning job

max_number_of_training_jobs = 50  # Maximum number of training jobs to run as part of the tuning job.
max_parallel_training_jobs = 5  # Maximum number of parallell training.
max_runtime_in_seconds = 3600  # Maximum runtime for tuning job.

# Create HyperParameterTrainingJobDefinition object, containing information about each individual training job.
hyper_parameter_training_job_defintion = HyperParameterTrainingJobDefinition(
        role_arn=role,
        algorithm_specification=HyperParameterAlgorithmSpecification(
            training_image=image,
            training_input_mode='File'
        ),
        input_data_config=[
            Channel(
                channel_name='train',
                content_type='csv',
                data_source=DataSource(
                    s3_data_source=S3DataSource(
                        s3_data_type='S3Prefix',
                        s3_uri=s3_train_input,
                        s3_data_distribution_type='FullyReplicated'
                    )
                )
            ),
            Channel(
                channel_name='validation',
                content_type='csv',
                data_source=DataSource(
                    s3_data_source=S3DataSource(
                        s3_data_type='S3Prefix',
                        s3_uri=s3_validation_input,
                        s3_data_distribution_type='FullyReplicated'
                    )
                )
            )
        ],
        output_data_config=OutputDataConfig(
            s3_output_path=s3_output_path
        ),
        stopping_condition=StoppingCondition(
            max_runtime_in_seconds=max_runtime_in_seconds
        ),
        resource_config=ResourceConfig(
            instance_type=instance_type,
            instance_count=instance_count,
            volume_size_in_gb=volume_size_in_gb,
        )
    )

# Create HyperParameterTrainingJobDefinition object, containing information about the tuning job
tuning_job_config = HyperParameterTuningJobConfig(
        strategy='Bayesian',
        hyper_parameter_tuning_job_objective=HyperParameterTuningJobObjective(
            type='Maximize',
            metric_name='validation:auc'
        ),
        resource_limits=ResourceLimits(
            max_number_of_training_jobs=max_number_of_training_jobs,
            max_parallel_training_jobs=max_parallel_training_jobs,
            max_runtime_in_seconds=3600
        ),
        training_job_early_stopping_type='Auto',
        parameter_ranges=ParameterRanges(
            auto_parameters=[
                AutoParameter(
                    name='max_depth',
                    value_hint='5'
                ),
                AutoParameter(
                    name='eta',
                    value_hint='0.1'
                ),
                AutoParameter(
                    name='gamma',
                    value_hint='8'
                ),
                AutoParameter(
                    name='min_child_weight',
                    value_hint='2'
                ),
                AutoParameter(
                    name='subsample',
                    value_hint='0.5'
                ),
                AutoParameter(
                    name='num_round',
                    value_hint='50'
                )
            ]
        )
    )

# Create the tuning job using the 2 configuration objects above
tuning_job = HyperParameterTuningJob.create(
    hyper_parameter_tuning_job_name=tuning_job_name,
    autotune=Autotune(
        mode='Enabled'
    ),
    training_job_definition=hyper_parameter_training_job_defintion,
    hyper_parameter_tuning_job_config=tuning_job_config
)

tuning_job.wait()

## Use model artifacts for batch inference
To use the model to perform batch inference, we can use a SageMaker Batch Transform job. The Transform Job requires a SageMaker model object, which contains information about what image and model to use.

Below, we:
1. Create a SageMaker model with the same first-party image as we used for training, and the model artifacts produced during training. Indeed, such image can also be used to run inference
2. Use that SagMaker model with a Transform Job to perform batch inference with our test dataset
3. Compute some performance metrics

More information about SageMaker Batch Transform can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html).

#### Create SageMaker Model

Create a Model resource based on the model artifacts produced by the best training job run by through hyperparameter tuning. 

In [None]:
from sagemaker_core.resources import Model
from sagemaker_core.shapes import ContainerDefinition

#model_s3_uri = training_job.model_artifacts.s3_model_artifacts  # Get URI of model artifacts from the training job.
model_s3_uri = TrainingJob.get(tuning_job.best_training_job.training_job_name).model_artifacts.s3_model_artifacts # Get URI of model artifacts of the best model from the tuning job.


# Create SageMaker model: An image along with the model artifact to use.
customer_churn_model = Model.create(
    model_name='customer-churn-xgboost',
    primary_container=ContainerDefinition(
        image=image,
        model_data_url=model_s3_uri
    ),
    execution_role_arn=role
)

#### Create Transform Job

In [None]:
from sagemaker_core.resources import TransformJob
from sagemaker_core.shapes import TransformInput, TransformDataSource, TransformS3DataSource, TransformOutput, TransformResources

model_name = customer_churn_model.get_name()
transform_job_name = 'churn-prediction' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())  # Name of TranformJob
s3_output_path = f"s3://{bucket}/transform"  # bucket and optional prefix where the TranformJob stores the result.
instance_type = 'ml.m4.xlarge'  # SageMaker instance type to use for TranformJob
instance_count = 1  # Number of instances to use for TranformJob

# Create Transform Job.
transform_job = TransformJob.create(
    transform_job_name=transform_job_name,
    model_name=model_name,
    transform_input=TransformInput(
        data_source=TransformDataSource(
            s3_data_source=TransformS3DataSource(
                s3_data_type="S3Prefix",
                s3_uri=s3_test_input
            )
        ),
        content_type="text/csv"
    ),
    transform_output=TransformOutput(
        s3_output_path=s3_output_path
    ),
    transform_resources=TransformResources(
        instance_type=instance_type,
        instance_count=instance_count
    )
)

transform_job.wait()

#### Compute performance metrics

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score

# A Transform Job uploads the results to a given output path in S3, with the name of the input file, with ".out" added at the end. 
output_file_name = transform_job.transform_input.data_source.s3_data_source.s3_uri.split('/')[-1] + '.out'  # Get output file name
output_s3_uri = f"{transform_job.transform_output.s3_output_path}/{output_file_name}"  # Create output S3 URI

def split_s3_path(s3_path):
    '''Lightweight method for extracting bucket and object key from S3 uri'''
    path_parts = s3_path.replace("s3://", "").split("/")
    bucket = path_parts.pop(0)
    key = "/".join(path_parts)
    return bucket, key

def print_performance_metrics(probs, y, threshold = 0.5):
    '''Lightweight method for printing performance metrics'''
    
    predictions = (probs >= threshold).astype(int)

    # Compare predictions with the stored target
    accuracy = accuracy_score(y, predictions)
    precision = precision_score(y, predictions)
    recall = recall_score(y, predictions)
    roc_auc = roc_auc_score(y, probs)

    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"ROC AUC: {roc_auc}")


# Extract bucket and key separately from uri
res_bucket, res_key = split_s3_path(output_s3_uri)

# Download Transform Job results
transform_job_result = sagemaker_session.read_s3_file(res_bucket, res_key)
transform_job_result = pd.read_csv(StringIO(transform_job_result), header=None)

print_performance_metrics(transform_job_result, test_target_column)

## Create SageMaker endpoint for real-time inference
To create a SageMaker endpoint we first create an `EndpointConfig`. The endpoint configuration specifies what SageMaker model to use, and what endpoint type. We then use the `EndpointConfig` together with other optional parameters to create a SageMaker Endpoint.

More information about SageMaker Endpoints can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints.html).


In [None]:
from sagemaker_core.resources import Endpoint, EndpointConfig
from sagemaker_core.shapes import ProductionVariant

endpoint_config_name = 'churn-prediction-endpoint-config'  # Name of endpoint configuration
model_name = customer_churn_model.get_name()  # Get name of SageMaker model created in previous step
endpoint_name = "customer-churn-endpoint"  # Name of SageMaker endpoint

endpoint_config = EndpointConfig.create(
    endpoint_config_name=endpoint_config_name,
    production_variants=[
        ProductionVariant(
            variant_name='AllTraffic',
            model_name=model_name,
            instance_type=instance_type,
            initial_instance_count=1
        )
    ]
)

sagemaker_endpoint = Endpoint.create(
    endpoint_name=endpoint_name,
    endpoint_config_name=endpoint_config.get_name(),
)

In [None]:
sagemaker_endpoint.wait_for_status(target_status='InService')  # Wait for endpoint to become in service

#### Test live endpoint - with one sample


In [None]:
# Extract one sample payload and convert to string
sample = test_data.sample(1)
sample_payload = sample.to_csv(header=False, index=False).strip()

# Send sample payload to live endpoint and parse response
res = sagemaker_endpoint.invoke(body=sample_payload, content_type="text/csv")
result = res['Body'].read().decode('utf-8')
result

#### Test live endpoint - with entire test dataset

In [None]:
# Convert entire test dataset to CSV string
sample_payload = test_data.to_csv(header=False, index=False).strip()

# Send sample payload to live endpoint and parse response
res = sagemaker_endpoint.invoke(body=sample_payload, content_type="text/csv")
result = res['Body'].read().decode('utf-8')
result = result.split('\n')[:-1]

# Compute performance metrics
df_result = pd.DataFrame(result).astype(float)
print_performance_metrics(df_result, test_target_column)

## Clean up

In [None]:
sagemaker_endpoint.delete()
endpoint_config.delete()
customer_churn_model.delete()