# SageMakerCore Overview of Resource Level Abstractions - XGBoost Training Example

---
## Introductions
SageMakerCore is a Python SDK designed as a lightweight layer over boto3, the AWS SDK for Python. It is built on the concept of resource level abstractions, where SageMaker Resources are represented as Python classes. This approach enables SageMakerCore to simplify the management of SageMaker Resources and provide a more object-oriented programming interface.

### Resource Level Abstraction
Resource Level Abstractions can be best understood by examining how the AWS TrainingJob APIs are transfromed into a TrainingJob Python class abstraction in SageMakerCore.

For instance, an AWS TrainingJob has the following APIs:
1. CreateTrainingJob
2. DescribeTrainingJob
3. UpdateTrainingJob
4. StopTrainingJob
5. ListTrainingJobs

In SageMakerCore, these APIs are encapsulated within a TrainingJob class that exposes these operations as methods and attributes. The details of the TrainingJob class are below:

```python
class TrainingJob(Base):
    # Class attributes are mapped to describe_training_job response
    training_job_name: str
    training_job_arn: Optional[str] = Unassigned()
    tuning_job_arn: Optional[str] = Unassigned()
    labeling_job_arn: Optional[str] = Unassigned()
    auto_ml_job_arn: Optional[str] = Unassigned()
    model_artifacts: Optional[ModelArtifacts] = Unassigned()
    training_job_status: Optional[str] = Unassigned()
    ...

    @classmethod
    def create():       # Calls `create_training_job`

    @classmethod
    def get():          # Calls `describe_training_job`

    @classmethod
    def get_all():      # Calls `list_training_job`

    
    def update():       # Calls `update_training_job`


    def stop():         # Calls `stop_training_job`


    def refresh():      # Calls `describe_training_job` and refreshes instance attributes


    def wait():         # Calls `describe_training_job` and waits for TrainingJob to enter terminal state
```

## Comparing Boto3 and SageMakerCore SDKs

In this notebook, we create an AWS TrainingJob to train an XGBoost Container. We will be using both Boto3 and the SageMakerCore SDKs with the goal of highlighting and comparing the differences in user experience for performing operations such as creating, updating, waiting, and listing AWS TrainingJobs.

### Install Latest SageMakerCore
All SageMakerCore beta distributions will be released to a private s3 bucket. After being allowlisted, run the cells below to install the latest version of SageMakerCore from `s3://sagemaker-core-beta-artifacts/sagemaker_core-latest.tar.gz`

Ensure you are using a kernel with python version >=3.8

In [None]:
# Uninstall previous version of sagemaker_core and restart kernel
!pip uninstall sagemaker-core -y

In [None]:
# Install the latest version of sagemaker_core

!pip install sagemaker-core --upgrade

In [None]:
# Check the version of sagemaker_core
!pip show -v sagemaker-core

### Install Additional Packages

In [None]:
# Install additional packages

!pip install -U scikit-learn pandas boto3

### Setup

Let's start by specifying:
- AWS region.
- The IAM role arn used to give learning and hosting access to your data. Ensure your enviornment has AWS Credentials configured.
- The S3 bucket that you want to use for storing training and model data.

In [None]:
from sagemaker_core.helper.session_helper import Session, get_execution_role
from rich import print

# Get region, role, bucket

sagemaker_session = Session()
region = sagemaker_session.boto_region_name
role = get_execution_role()
bucket = sagemaker_session.default_bucket()
print(role)

### Load and Prepare Dataset
For this example, we will be using the IRIS data set from `sklearn.datasets` to train our XGBoost container.

In [None]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

import pandas as pd

# Get IRIS Data

iris = load_iris()
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)
iris_df['target'] = iris.target

In [None]:
import os

# Prepare Data

os.makedirs('./data', exist_ok=True)

iris_df = iris_df[['target'] + [col for col in iris_df.columns if col != 'target']]

train_data, test_data = train_test_split(iris_df, test_size=0.2, random_state=42)

train_data.to_csv('./data/train.csv', index=False, header=False)

### Upload Data to S3
In this step, we will upload the train and test data to the S3 bucket configured earlier using `sagemaker_session.default_bucket()`

In [None]:
# Upload Data

prefix = "DEMO-scikit-iris"
TRAIN_DATA = "train.csv"
DATA_DIRECTORY = "data"

train_input = sagemaker_session.upload_data(
    DATA_DIRECTORY, bucket=bucket, key_prefix="{}/{}".format(prefix, DATA_DIRECTORY)
)

s3_input_path = "s3://{}/{}/data/{}".format(bucket, prefix, TRAIN_DATA)
s3_output_path = "s3://{}/{}/output".format(bucket, prefix)

print(s3_input_path)
print(s3_output_path)

### Fetch the XGBoost Image URI
In this step, we will fetch the XGBoost Image URI we will use as an input parameter when creating an AWS TrainingJob

In [None]:
# Image name is hardcoded here
# Image name can be programatically got by using sagemaker package and calling image_uris.retrieve
# Since that is a high level abstraction that has multiple dependencies, the image URIs functionalities will live in sagemaker (V2)

image = "433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest"

### Create TrainingJob with Boto3
With the necessary setup completed, we can now create an AWS TrainingJob. First we will begin by creating a TrainingJob with Boto3 to understand what the experience is like when interecting directly with low-level APIs through Boto3.

When executing the following cells there are a few things to note about the experience with Boto3:
1. Boto3 dynamically generates the API operation methods like `create_training_job`. When a client is instantiated, the methods are generated from the JSON service model description and are not statically coded into the boto3 library.
2. Boto3 returns a JSON response. As a result, users must either be familiar with the structure of these responses or refer to the documentation to parse them correctly.
3. Boto3 client methods expect keyword arguments. Similar to the experience with JSON response, users must be familiar with what keyword argumnets are expected or refer to the documentation to pass them correctly.


In [None]:
# Create TrainingJob with Boto3

import time
import boto3

client = boto3.client('sagemaker')
job_name_boto = 'xgboost-iris-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

response = client.create_training_job(
    TrainingJobName=job_name_boto,
    HyperParameters={
        'objective': 'multi:softmax',
        'num_class': '3',
        'num_round': '10',
        'eval_metric': 'merror'
    },
    AlgorithmSpecification={
        'TrainingImage': image,
        'TrainingInputMode': 'File'
    },
    RoleArn=role,
    InputDataConfig=[
        {
            'ChannelName': 'train',
            'ContentType': 'csv',
            'DataSource': {
                'S3DataSource': {
                    'S3DataType': 'S3Prefix',
                    'S3Uri': s3_input_path,
                    'S3DataDistributionType': 'FullyReplicated'
                }
            },
            'CompressionType': 'None',
            'RecordWrapperType': 'None'
        }
    ],
    OutputDataConfig={
        'S3OutputPath': s3_output_path
    },
    ResourceConfig={
        'InstanceType': 'ml.m4.xlarge',
        'InstanceCount': 1,
        'VolumeSizeInGB': 30
    },
    StoppingCondition={
        'MaxRuntimeInSeconds': 600
    }
)
print(response)

### Wait for TrainingJob with Boto3
When a user creates a TrainingJob it is often the case that they would wish to wait on the TrainingJob to complete. Below is an example of how a user wait on a TrainingJob using Boto3. Notebly, this requires creating some logic to poll the TrainingJob using `describe_training_job` until the `TrainingJobStatus` is `'Failed'`, `'Completed'`, or `'Stopped'`.

In [None]:
# Wait for TrainingJob with Boto3
import time

while True:
    response = client.describe_training_job(TrainingJobName=job_name_boto)
    status = response['TrainingJobStatus']
    if status in ['Failed', 'Completed', 'Stopped']:
        if status == 'Failed':
            print(response['FailureReason'])
        break
    print("-", end="")
    time.sleep(5)

### Create TrainingJob with SageMakerCore
In this step we will use SageMakerCore to create a TrainingJob to understand what experience the object-oriented resource level abstractions provide for users.

When executing the following cells, there are a few things to note about the experience with SageMakerCore:
1. SageMakerCore generates Python classes and methods from the service model JSON, similar to Boto3. However, this generation is done prior to a release, resulting in a statically coded interface in the library.
2. SageMakerCore adopts an object-oriented approach, providing users with clear visibility of available methods and attributes through type hinting and IDE IntelliSense
3. Instead of returning JSON responses like Boto3, SageMakerCore returns objects. This allows users to access response attributes directly from the returned object, eliminating the need to parse JSON or refer to the documentation for structure details.

In [None]:
# Create TrainingJob with SageMakerCore

import time
from sagemaker_core.resources import TrainingJob, AlgorithmSpecification, Channel, DataSource, S3DataSource, \
    OutputDataConfig, ResourceConfig, StoppingCondition

job_name_v3 = 'xgboost-iris-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

training_job = TrainingJob.create(
    training_job_name=job_name_v3,
    hyper_parameters={
        'objective': 'multi:softmax',
        'num_class': '3',
        'num_round': '10',
        'eval_metric': 'merror'
    },
    algorithm_specification=AlgorithmSpecification(
        training_image=image,
        training_input_mode='File'
    ),
    role_arn=role,
    input_data_config=[
        Channel(
            channel_name='train',
            content_type='csv',
            compression_type='None',
            record_wrapper_type='None',
            data_source=DataSource(
                s3_data_source=S3DataSource(
                    s3_data_type='S3Prefix',
                    s3_uri=s3_input_path,
                    s3_data_distribution_type='FullyReplicated'
                )
            )
        )
    ],
    output_data_config=OutputDataConfig(
        s3_output_path=s3_output_path
    ),
    resource_config=ResourceConfig(
        instance_type='ml.m4.xlarge',
        instance_count=1,
        volume_size_in_gb=30
    ),
    stopping_condition=StoppingCondition(
        max_runtime_in_seconds=600
    )
)

### Wait for TrainingJob with SageMakerCore
In SageMakerCore, the logic required to wait on a resource is abstracted away using a `wait()` method. As a result, a user can directly call the `wait()` method on a TrainingJob object instance like below. 

In [None]:
# Wait for TrainingJob with SageMakerCore

training_job.wait(logs=True)

## List TrainingJobs with Boto3
When a user lists TrainingJobs, there are 2 main approaches provided by Boto3. 

1. The first is calling `list_training_jobs` directly and implementing some logic to handle the NextToken provided in the response to enable pagination.
2. The second is by utilizing the Boto3 `get_paginator` method to get a paginator that encapsulates the NextToken and simplifies the logic required.

Both approaches are shown below. Although the boto3 provided paginator simplifies the logic over using a NextToken, in both cases the user must understand the structure of the list responses or refer to the docs (ie, understand to access TrainingJobSummaries by doing `response["TrainingJobSummaries"]`)


In [None]:
# List TrainingJobs with Boto3
import datetime
import boto3

client = boto3.client('sagemaker')

creation_time_after = datetime.datetime.now() - datetime.timedelta(days=1)

# List TrainingJobs with NextToken
next_token = None
while True:
    if next_token:
        response = client.list_training_jobs(CreationTimeAfter=creation_time_after, NextToken=next_token)
    else: 
        response = client.list_training_jobs(CreationTimeAfter=creation_time_after)
    
    for job in response['TrainingJobSummaries']:
        print(job['TrainingJobName'], job["TrainingJobStatus"])
        
    next_token = response.get('NextToken')
    
    if not next_token:
        break

In [None]:
import datetime
import boto3

client = boto3.client('sagemaker')
creation_time_after = datetime.datetime.now() - datetime.timedelta(days=1)

# List TrainingJobs with Boto3 Paginator
paginator = client.get_paginator('list_training_jobs')
for response in paginator.paginate(CreationTimeAfter=creation_time_after):
    for job in response['TrainingJobSummaries']:
        print(job['TrainingJobName'], job["TrainingJobStatus"])

## List TrainingJobs with SageMakerCore

In SageMakerCore, listing is done similar to the boto3 paginator approach but instead with a `ResourceIterator` which implements the python iterator protocol to instantiate and return resource objects only as they are accessed.


Below, is an example of how the `get_all()` method would be used to list TrainingJobs.

In [None]:
# List TrainingJobs with SageMakerCore
import datetime
from sagemaker_core.resources import TrainingJob

creation_time_after = datetime.datetime.now() - datetime.timedelta(days=1)

resource_iterator = TrainingJob.get_all(creation_time_after=creation_time_after)
for job in resource_iterator:
    print(job.training_job_name, job.training_job_status)

## Delete All SageMaker Resources
The following code block will call the delete() method for any SageMaker Core Resources created during the execution of this notebook which were assigned to local or global variables. If you created any additional deleteable resources without assigning the returning object to a unique variable, you will need to delete the resource manually by doing something like:

```python
resource = Resource.get("resource-name")
resource.delete()
```

In [None]:
# Delete any sagemaker core resource objects created in this notebook
def delete_all_sagemaker_resources():
    all_objects = list(locals().values()) + list(globals().values())
    deletable_objects = [obj for obj in all_objects if hasattr(obj, 'delete') and obj.__class__.__module__ == 'sagemaker_core.main.resources']
    
    for obj in deletable_objects:
        obj.delete()
        
delete_all_sagemaker_resources()