# SageMakerCore Inference, Async Inference, and Resource Chaining

---

## Introductions

In this notebook, we will walkthrough the process of performing Inference using the SageMakerCore SDK. Additionaly, this notebook will highlight how to create an endpoint using the Resource Chaining feature.



### Resource Chaining

Resource Chaining is a feature provided by SageMakerCore that aims to reduce the cognitive load for a user when performing operations with the SDK. The idea is to allow users to create an object, for example a  `Model` resource object, and pass the object directly as a parameter to some other resource like `EndpointConfig`. An example of this chaining can be seen below:

```python
key = f'xgboost-iris-{strftime("%H-%M-%S", gmtime())}'

model = Model.create(...) # Create model object

endpoint_config = ndpointConfig.create(
    endpoint_config_name=key,
    production_variants=[
        ProductionVariant(
            variant_name=key,
            initial_instance_count=1,
            instance_type='ml.m5.xlarge',
            model_name=model # Pass model object directly
        )
    ]
)
```

## Pre-Requisites

### Install Latest SageMakerCore
All SageMakerCore beta distributions will be released to a private s3 bucket. After being allowlisted, run the cells below to install the latest version of SageMakerCore from `s3://sagemaker-core-beta-artifacts/sagemaker_core-latest.tar.gz`

Ensure you are using a kernel with python version >=3.8

In [None]:
# Uninstall previous version of sagemaker-core and restart kernel
!pip uninstall sagemaker-core -y

In [None]:
# Install the latest version of sagemaker-core

!pip install sagemaker-core --upgrade

In [None]:
# Check the version of sagemaker-core
!pip show -v sagemaker-core

### Install Additional Packages

In [None]:
# Install additionall packages

!pip install -U scikit-learn pandas boto3

### Setup

Let's start by specifying:
- AWS region.
- The IAM role arn used to give learning and hosting access to your data. Ensure your enviornment has AWS Credentials configured.
- The S3 bucket that you want to use for storing training and model data.

In [None]:
from sagemaker_core.helper.session_helper import get_execution_role, Session
from rich import print

# Get region, role, bucket

sagemaker_session = Session()
region = sagemaker_session.boto_region_name
role = get_execution_role()
bucket = sagemaker_session.default_bucket()
print(role)

### Load and Prepare Dataset
For this example, we will be using the IRIS data set from `sklearn.datasets` to train our XGBoost container.

In [None]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

import pandas as pd

# Get IRIS Data

iris = load_iris()
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)
iris_df['target'] = iris.target

In [None]:
import os

# Prepare Data

os.makedirs('./data', exist_ok=True)

iris_df = iris_df[['target'] + [col for col in iris_df.columns if col != 'target']]

train_data, test_data = train_test_split(iris_df, test_size=0.2, random_state=42)

train_data.to_csv('./data/train.csv', index=False, header=False)
test_data.to_csv('./data/test.csv', index=False, header=False)

# Remove the target column from the testing data. We will use this to call invoke_endpoint later
test_data_no_target = test_data.drop('target', axis=1)

### Upload Data to S3
In this step, we will upload the train and test data to the S3 bucket configured earlier using `sagemaker_session.default_bucket()`

In [None]:
# Upload Data

prefix = "DEMO-scikit-iris"
TRAIN_DATA = "train.csv"
DATA_DIRECTORY = "data"

train_input = sagemaker_session.upload_data(
    DATA_DIRECTORY, bucket=bucket, key_prefix="{}/{}".format(prefix, DATA_DIRECTORY)
)

s3_input_path = "s3://{}/{}/data/{}".format(bucket, prefix, TRAIN_DATA)
s3_output_path = "s3://{}/{}/output".format(bucket, prefix)

print(s3_input_path)
print(s3_output_path)

### Fetch the XGBoost Image URI
In this step, we will fetch the XGBoost Image URI we will use as an input parameter when creating an AWS TrainingJob

In [None]:
# Image name is hardcoded here
# Image name can be programatically got by using sagemaker package and calling image_uris.retrieve
# Since that is a high level abstraction that has multiple dependencies, the image URIs functionalities will live in sagemaker (V2)

image = "433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest"

### Train XGBoost Image using IRIS Data

Next, we will the SageMakerCore  `TrainingJob.create()` to start a training job for an XGBoost Image using IRIS data and wait for it to complete.

In [None]:
# Create TrainingJob with SageMakerCore

import time
from sagemaker_core.resources import TrainingJob, AlgorithmSpecification, Channel, DataSource, S3DataSource, \
    OutputDataConfig, ResourceConfig, StoppingCondition

job_name_v3 = 'xgboost-iris-' + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())

training_job = TrainingJob.create(
    training_job_name=job_name_v3,
    hyper_parameters={
        'objective': 'multi:softmax',
        'num_class': '3',
        'num_round': '10',
        'eval_metric': 'merror'
    },
    algorithm_specification=AlgorithmSpecification(
        training_image=image,
        training_input_mode='File'
    ),
    role_arn=role,
    input_data_config=[
        Channel(
            channel_name='train',
            content_type='csv',
            compression_type='None',
            record_wrapper_type='None',
            data_source=DataSource(
                s3_data_source=S3DataSource(
                    s3_data_type='S3Prefix',
                    s3_uri=s3_input_path,
                    s3_data_distribution_type='FullyReplicated'
                )
            )
        )
    ],
    output_data_config=OutputDataConfig(
        s3_output_path=s3_output_path
    ),
    resource_config=ResourceConfig(
        instance_type='ml.m4.xlarge',
        instance_count=1,
        volume_size_in_gb=30
    ),
    stopping_condition=StoppingCondition(
        max_runtime_in_seconds=600
    )
)

In [None]:
training_job.wait()

## Create Endpoint Using Resource Chaining

In the following cells, we will walkthrough the process of creating an Endpoint using the Resource Chaining feature of SageMakerCore. Resource Chaining aims to reduce the cognitive load for a user by autoresolving necessary attributes when chaing resource objects together during operations.

1. First, we will create a `Model` using the model data from the training job in the previous step.
2. We will create an `EndpointConfig` and pass the `Model` object directly as a parameter. SageMakerCore will autoresolve the neccessary attributes from the `Model` object.
3. We will create an `Endpoint` using the `EndpointConfig` object as a parameter. SageMakerCore will autoresolve the neccessary attributes from the `EndpointConfig` object.



### Create and Wait for Endpoint

Create a `Model` by specifying the `image` and `model_data_url`. For the `model_data_url` we will use the S3 path of the model output from the TrainingJob we performed previously.

Notice that we are able to set the `model_data_url` directly by referencing the `s3_model_artifacts` from the nested `ModelArtifacts` object attribute. This is possible due to SageMakerCore's object-oriented programming experience. 


Class Definitions example:

```python
class TrainingJob(Base):
    ...
    model_artifacts: Optional[ModelArtifacts] = Unassigned()

class ModelArtifacts(Base):
    s3_model_artifacts: str
```


A user can then reference attributes for nested objects like:

```python
model_data_url = training_job.model_artifacts.s3_model_artifacts
```


In [None]:
from sagemaker_core.shapes import ContainerDefinition, ProductionVariant
from sagemaker_core.resources import Model, EndpointConfig, Endpoint
from time import gmtime, strftime

# Get model_data_url from training_job object
model_data_url = training_job.model_artifacts.s3_model_artifacts

key = f'xgboost-iris-{strftime("%H-%M-%S", gmtime())}'
print("Key:", key)

model = Model.create(
    model_name=key,
    primary_container=ContainerDefinition(
        image=image,
        model_data_url=model_data_url,
    ),
    execution_role_arn=role,
)

Create the `Endpoint` and wait for it to be `InService`

In [None]:
endpoint_config = EndpointConfig.create(
    endpoint_config_name=key,
    production_variants=[
        ProductionVariant(
            variant_name=key,
            initial_instance_count=1,
            instance_type='ml.m5.xlarge',
            model_name=model # Pass `Model`` object created above
        )
    ]
)

endpoint: Endpoint = Endpoint.create(
    endpoint_name=key,
    endpoint_config_name=endpoint_config # Pass `EndpointConfig` object created above
)

In [None]:
endpoint.wait_for_status("InService")

### Endpoint Invoke

The below cells demonstrates how an endpoint would be invoked synchronously in SageMakerCore using `endpoint.invoke()` or `endpoint.inoke_with_response_stream()`. 

In these examples, we are using CSV data to train the model and to invoking the endpoint. We will rely on the `CSVSerializer` and `CSVDeserializer` from the the sagemaker-python-sdk to assist with serilizing and deserializing the invoke input and output.

In [None]:
from sagemaker.base_serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer

deserializer = CSVDeserializer()
serializer = CSVSerializer()

invoke_result = endpoint.invoke(body=serializer.serialize(test_data_no_target),
                                content_type='text/csv',
                                accept='text/csv')

deserialized_result = deserializer.deserialize(invoke_result['Body'], invoke_result['ContentType'])

print("Endpoint Response:", deserialized_result)

### Endpoint Invoke With Response Stream

In [None]:
def deserialise(response):
    return [
        res_part
        for res_part in response['Body']
    ]


invoke_result = endpoint.invoke_with_response_stream(body=serializer.serialize(test_data_no_target),
                                                     content_type='text/csv',
                                                     accept='application/csv')

print("Endpoint Stream Response:", deserialise(invoke_result))

## Create Endpoint for Async Invoke

Now that we have gone through the process of creating and invoking endpoint synchronously using SageMakerCore. In the next section, we will create a new endpoint for asynchronous invocations and call `endpoint.invoke_async()`.

### Download the Input Files and Pre-Trained Model tar.gz from S3

In [None]:
import boto3
import os

# Download the Input files and model from S3 bucket
os.makedirs('./input', exist_ok=True)
os.makedirs('./model', exist_ok=True)

s3 = boto3.client("s3")
for key in s3.list_objects(
    Bucket=f"sagemaker-example-files-prod-{region}", Prefix="models/async-inference/input-files/"
)["Contents"]:
    s3.download_file(
        f"sagemaker-example-files-prod-{region}", key["Key"], "input/" + key["Key"].split("/")[-1]
    )
s3.download_file(
    f"sagemaker-example-files-prod-{region}",
    "models/async-inference/demo-xgboost-model.tar.gz",
    "model/demo-xgboost-model.tar.gz",
)

### Upload Data to S3
In this step, we will upload the input and model data to the S3 bucket configured earlier using `sagemaker_session.default_bucket()` and set the `model_url` variable that we will use to create a `Model` resource object.

In [None]:
# Upload the model to S3 bucket
bucket_prefix = "async-inference-demo"
bucket = sagemaker_session.default_bucket()

model_s3_key = f"{bucket_prefix}/demo-xgboost-model.tar.gz"
async_s3_output_path = f"s3://{bucket}/{bucket_prefix}/output"

model_url = sagemaker_session.upload_data("model/demo-xgboost-model.tar.gz", bucket, bucket_prefix)

print(f"Uploading Model to {model_url}")

### Create and Wait for Endpoint

Create a `Model` by specifying the `image` and `model_data_url`. For the `model_data_url` we will use the S3 path of the pretrained model uploaded previously.

In [None]:
key = f'xgboost-iris-{strftime("%H-%M-%S", gmtime())}'
print("Key:", key)

async_model = Model.create(
    model_name=key,
    primary_container=ContainerDefinition(
        image=image,
        model_data_url=model_url,
    ),
    execution_role_arn=role,
)

Create the `Endpoint` and wait for it to be `InService`

In [None]:
from sagemaker_core.shapes import ProductionVariant, AsyncInferenceConfig, AsyncInferenceOutputConfig, AsyncInferenceClientConfig

async_endpoint_config = EndpointConfig.create(
    endpoint_config_name=key,
    production_variants=[
        ProductionVariant(
            variant_name="variant1",
            model_name=async_model,
            instance_type='ml.m5.xlarge',
            initial_instance_count=1
        )
    ],
    async_inference_config=AsyncInferenceConfig(
        output_config=AsyncInferenceOutputConfig(s3_output_path=async_s3_output_path),
        client_config=AsyncInferenceClientConfig(
            max_concurrent_invocations_per_instance=4
        )
    )
)

async_endpoint = Endpoint.create(endpoint_name=key, endpoint_config_name=async_endpoint_config)

In [None]:
async_endpoint.wait_for_status("InService")

### Upload the Async Invoke Payload
To invoke an endpoint asynchronously, we first must upload the request payload to S3.

In [None]:
def upload_file(input_location):
    prefix = f"{bucket_prefix}/input"
    return sagemaker_session.upload_data(
        input_location,
        bucket=sagemaker_session.default_bucket(),
        key_prefix=prefix,
        extra_args={"ContentType": "text/libsvm"},
    )

input_path = "input/test_point_0.libsvm"
input_s3_path = upload_file(input_path)

### Endpoint Async Invoke

Call `endpoint.invoke_async()` using the s3 path of the invoke request payload and store the "OutputLocation" of from the response.

In [None]:
response = async_endpoint.invoke_async(input_location=input_s3_path)
output_location = response["OutputLocation"]
print(output_location)

### Check the Output Location
Check the output location from the `endpoint.invoke_async()` response to get the async inference results.

In [None]:
import urllib, time
from botocore.exceptions import ClientError


def get_output(output_location):
    output_url = urllib.parse.urlparse(output_location)
    bucket = output_url.netloc
    key = output_url.path[1:]
    while True:
        try:
            return sagemaker_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])
        except ClientError as e:
            if e.response["Error"]["Code"] == "NoSuchKey":
                print("waiting for output...")
                time.sleep(2)
                continue
            raise

In [None]:
output = get_output(output_location)
print(f"Output: {output}")

## Delete All SageMaker Resources
The following code block will call the delete() method for any SageMaker Core Resources created during the execution of this notebook which were assigned to local or global variables. If you created any additional deleteable resources without assigning the returning object to a unique variable, you will need to delete the resource manually by doing something like:

```python
resource = Resource.get("resource-name")
resource.delete()
```

In [None]:
# Delete any sagemaker core resource objects created in this notebook
def delete_all_sagemaker_resources():
    all_objects = list(locals().values()) + list(globals().values())
    deletable_objects = [obj for obj in all_objects if hasattr(obj, 'delete') and obj.__class__.__module__ == 'sagemaker_core.main.resources']
    
    for obj in deletable_objects:
        obj.delete()
        
delete_all_sagemaker_resources()