example_notebooks/inference_and_resource_chaining.ipynb (788 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# SageMakerCore Inference, Async Inference, and Resource Chaining" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introductions\n", "\n", "In this notebook, we will walkthrough the process of performing Inference using the SageMakerCore SDK. Additionaly, this notebook will highlight how to create an endpoint using the Resource Chaining feature.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Resource Chaining\n", "\n", "Resource Chaining is a feature provided by SageMakerCore that aims to reduce the cognitive load for a user when performing operations with the SDK. The idea is to allow users to create an object, for example a `Model` resource object, and pass the object directly as a parameter to some other resource like `EndpointConfig`. An example of this chaining can be seen below:\n", "\n", "```python\n", "key = f'xgboost-iris-{strftime(\"%H-%M-%S\", gmtime())}'\n", "\n", "model = Model.create(...) # Create model object\n", "\n", "endpoint_config = ndpointConfig.create(\n", " endpoint_config_name=key,\n", " production_variants=[\n", " ProductionVariant(\n", " variant_name=key,\n", " initial_instance_count=1,\n", " instance_type='ml.m5.xlarge',\n", " model_name=model # Pass model object directly\n", " )\n", " ]\n", ")\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pre-Requisites" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Install Latest SageMakerCore\n", "All SageMakerCore beta distributions will be released to a private s3 bucket. After being allowlisted, run the cells below to install the latest version of SageMakerCore from `s3://sagemaker-core-beta-artifacts/sagemaker_core-latest.tar.gz`\n", "\n", "Ensure you are using a kernel with python version >=3.8" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Uninstall previous version of sagemaker-core and restart kernel\n", "!pip uninstall sagemaker-core -y" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install the latest version of sagemaker-core\n", "\n", "!pip install sagemaker-core --upgrade" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Check the version of sagemaker-core\n", "!pip show -v sagemaker-core" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Install Additional Packages" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install additionall packages\n", "\n", "!pip install -U scikit-learn pandas boto3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup\n", "\n", "Let's start by specifying:\n", "- AWS region.\n", "- The IAM role arn used to give learning and hosting access to your data. Ensure your enviornment has AWS Credentials configured.\n", "- The S3 bucket that you want to use for storing training and model data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker_core.helper.session_helper import get_execution_role, Session\n", "from rich import print\n", "\n", "# Get region, role, bucket\n", "\n", "sagemaker_session = Session()\n", "region = sagemaker_session.boto_region_name\n", "role = get_execution_role()\n", "bucket = sagemaker_session.default_bucket()\n", "print(role)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load and Prepare Dataset\n", "For this example, we will be using the IRIS data set from `sklearn.datasets` to train our XGBoost container." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.datasets import load_iris\n", "from sklearn.model_selection import train_test_split\n", "\n", "import pandas as pd\n", "\n", "# Get IRIS Data\n", "\n", "iris = load_iris()\n", "iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)\n", "iris_df['target'] = iris.target" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "# Prepare Data\n", "\n", "os.makedirs('./data', exist_ok=True)\n", "\n", "iris_df = iris_df[['target'] + [col for col in iris_df.columns if col != 'target']]\n", "\n", "train_data, test_data = train_test_split(iris_df, test_size=0.2, random_state=42)\n", "\n", "train_data.to_csv('./data/train.csv', index=False, header=False)\n", "test_data.to_csv('./data/test.csv', index=False, header=False)\n", "\n", "# Remove the target column from the testing data. We will use this to call invoke_endpoint later\n", "test_data_no_target = test_data.drop('target', axis=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload Data to S3\n", "In this step, we will upload the train and test data to the S3 bucket configured earlier using `sagemaker_session.default_bucket()`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Upload Data\n", "\n", "prefix = \"DEMO-scikit-iris\"\n", "TRAIN_DATA = \"train.csv\"\n", "DATA_DIRECTORY = \"data\"\n", "\n", "train_input = sagemaker_session.upload_data(\n", " DATA_DIRECTORY, bucket=bucket, key_prefix=\"{}/{}\".format(prefix, DATA_DIRECTORY)\n", ")\n", "\n", "s3_input_path = \"s3://{}/{}/data/{}\".format(bucket, prefix, TRAIN_DATA)\n", "s3_output_path = \"s3://{}/{}/output\".format(bucket, prefix)\n", "\n", "print(s3_input_path)\n", "print(s3_output_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fetch the XGBoost Image URI\n", "In this step, we will fetch the XGBoost Image URI we will use as an input parameter when creating an AWS TrainingJob" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Image name is hardcoded here\n", "# Image name can be programatically got by using sagemaker package and calling image_uris.retrieve\n", "# Since that is a high level abstraction that has multiple dependencies, the image URIs functionalities will live in sagemaker (V2)\n", "\n", "image = \"433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train XGBoost Image using IRIS Data\n", "\n", "Next, we will the SageMakerCore `TrainingJob.create()` to start a training job for an XGBoost Image using IRIS data and wait for it to complete." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create TrainingJob with SageMakerCore\n", "\n", "import time\n", "from sagemaker_core.resources import TrainingJob, AlgorithmSpecification, Channel, DataSource, S3DataSource, \\\n", " OutputDataConfig, ResourceConfig, StoppingCondition\n", "\n", "job_name_v3 = 'xgboost-iris-' + time.strftime(\"%Y-%m-%d-%H-%M-%S\", time.gmtime())\n", "\n", "training_job = TrainingJob.create(\n", " training_job_name=job_name_v3,\n", " hyper_parameters={\n", " 'objective': 'multi:softmax',\n", " 'num_class': '3',\n", " 'num_round': '10',\n", " 'eval_metric': 'merror'\n", " },\n", " algorithm_specification=AlgorithmSpecification(\n", " training_image=image,\n", " training_input_mode='File'\n", " ),\n", " role_arn=role,\n", " input_data_config=[\n", " Channel(\n", " channel_name='train',\n", " content_type='csv',\n", " compression_type='None',\n", " record_wrapper_type='None',\n", " data_source=DataSource(\n", " s3_data_source=S3DataSource(\n", " s3_data_type='S3Prefix',\n", " s3_uri=s3_input_path,\n", " s3_data_distribution_type='FullyReplicated'\n", " )\n", " )\n", " )\n", " ],\n", " output_data_config=OutputDataConfig(\n", " s3_output_path=s3_output_path\n", " ),\n", " resource_config=ResourceConfig(\n", " instance_type='ml.m4.xlarge',\n", " instance_count=1,\n", " volume_size_in_gb=30\n", " ),\n", " stopping_condition=StoppingCondition(\n", " max_runtime_in_seconds=600\n", " )\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "training_job.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Endpoint Using Resource Chaining\n", "\n", "In the following cells, we will walkthrough the process of creating an Endpoint using the Resource Chaining feature of SageMakerCore. Resource Chaining aims to reduce the cognitive load for a user by autoresolving necessary attributes when chaing resource objects together during operations.\n", "\n", "1. First, we will create a `Model` using the model data from the training job in the previous step.\n", "2. We will create an `EndpointConfig` and pass the `Model` object directly as a parameter. SageMakerCore will autoresolve the neccessary attributes from the `Model` object.\n", "3. We will create an `Endpoint` using the `EndpointConfig` object as a parameter. SageMakerCore will autoresolve the neccessary attributes from the `EndpointConfig` object.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create and Wait for Endpoint" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a `Model` by specifying the `image` and `model_data_url`. For the `model_data_url` we will use the S3 path of the model output from the TrainingJob we performed previously.\n", "\n", "Notice that we are able to set the `model_data_url` directly by referencing the `s3_model_artifacts` from the nested `ModelArtifacts` object attribute. This is possible due to SageMakerCore's object-oriented programming experience. \n", "\n", "\n", "Class Definitions example:\n", "\n", "```python\n", "class TrainingJob(Base):\n", " ...\n", " model_artifacts: Optional[ModelArtifacts] = Unassigned()\n", "\n", "class ModelArtifacts(Base):\n", " s3_model_artifacts: str\n", "```\n", "\n", "\n", "A user can then reference attributes for nested objects like:\n", "\n", "```python\n", "model_data_url = training_job.model_artifacts.s3_model_artifacts\n", "```\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker_core.shapes import ContainerDefinition, ProductionVariant\n", "from sagemaker_core.resources import Model, EndpointConfig, Endpoint\n", "from time import gmtime, strftime\n", "\n", "# Get model_data_url from training_job object\n", "model_data_url = training_job.model_artifacts.s3_model_artifacts\n", "\n", "key = f'xgboost-iris-{strftime(\"%H-%M-%S\", gmtime())}'\n", "print(\"Key:\", key)\n", "\n", "model = Model.create(\n", " model_name=key,\n", " primary_container=ContainerDefinition(\n", " image=image,\n", " model_data_url=model_data_url,\n", " ),\n", " execution_role_arn=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the `Endpoint` and wait for it to be `InService`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_config = EndpointConfig.create(\n", " endpoint_config_name=key,\n", " production_variants=[\n", " ProductionVariant(\n", " variant_name=key,\n", " initial_instance_count=1,\n", " instance_type='ml.m5.xlarge',\n", " model_name=model # Pass `Model`` object created above\n", " )\n", " ]\n", ")\n", "\n", "endpoint: Endpoint = Endpoint.create(\n", " endpoint_name=key,\n", " endpoint_config_name=endpoint_config # Pass `EndpointConfig` object created above\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint.wait_for_status(\"InService\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Endpoint Invoke\n", "\n", "The below cells demonstrates how an endpoint would be invoked synchronously in SageMakerCore using `endpoint.invoke()` or `endpoint.inoke_with_response_stream()`. \n", "\n", "In these examples, we are using CSV data to train the model and to invoking the endpoint. We will rely on the `CSVSerializer` and `CSVDeserializer` from the the sagemaker-python-sdk to assist with serilizing and deserializing the invoke input and output." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.base_serializers import CSVSerializer\n", "from sagemaker.deserializers import CSVDeserializer\n", "\n", "deserializer = CSVDeserializer()\n", "serializer = CSVSerializer()\n", "\n", "invoke_result = endpoint.invoke(body=serializer.serialize(test_data_no_target),\n", " content_type='text/csv',\n", " accept='text/csv')\n", "\n", "deserialized_result = deserializer.deserialize(invoke_result['Body'], invoke_result['ContentType'])\n", "\n", "print(\"Endpoint Response:\", deserialized_result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Endpoint Invoke With Response Stream" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def deserialise(response):\n", " return [\n", " res_part\n", " for res_part in response['Body']\n", " ]\n", "\n", "\n", "invoke_result = endpoint.invoke_with_response_stream(body=serializer.serialize(test_data_no_target),\n", " content_type='text/csv',\n", " accept='application/csv')\n", "\n", "print(\"Endpoint Stream Response:\", deserialise(invoke_result))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Endpoint for Async Invoke\n", "\n", "Now that we have gone through the process of creating and invoking endpoint synchronously using SageMakerCore. In the next section, we will create a new endpoint for asynchronous invocations and call `endpoint.invoke_async()`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download the Input Files and Pre-Trained Model tar.gz from S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import os\n", "\n", "# Download the Input files and model from S3 bucket\n", "os.makedirs('./input', exist_ok=True)\n", "os.makedirs('./model', exist_ok=True)\n", "\n", "s3 = boto3.client(\"s3\")\n", "for key in s3.list_objects(\n", " Bucket=f\"sagemaker-example-files-prod-{region}\", Prefix=\"models/async-inference/input-files/\"\n", ")[\"Contents\"]:\n", " s3.download_file(\n", " f\"sagemaker-example-files-prod-{region}\", key[\"Key\"], \"input/\" + key[\"Key\"].split(\"/\")[-1]\n", " )\n", "s3.download_file(\n", " f\"sagemaker-example-files-prod-{region}\",\n", " \"models/async-inference/demo-xgboost-model.tar.gz\",\n", " \"model/demo-xgboost-model.tar.gz\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload Data to S3\n", "In this step, we will upload the input and model data to the S3 bucket configured earlier using `sagemaker_session.default_bucket()` and set the `model_url` variable that we will use to create a `Model` resource object." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Upload the model to S3 bucket\n", "bucket_prefix = \"async-inference-demo\"\n", "bucket = sagemaker_session.default_bucket()\n", "\n", "model_s3_key = f\"{bucket_prefix}/demo-xgboost-model.tar.gz\"\n", "async_s3_output_path = f\"s3://{bucket}/{bucket_prefix}/output\"\n", "\n", "model_url = sagemaker_session.upload_data(\"model/demo-xgboost-model.tar.gz\", bucket, bucket_prefix)\n", "\n", "print(f\"Uploading Model to {model_url}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create and Wait for Endpoint" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a `Model` by specifying the `image` and `model_data_url`. For the `model_data_url` we will use the S3 path of the pretrained model uploaded previously." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "key = f'xgboost-iris-{strftime(\"%H-%M-%S\", gmtime())}'\n", "print(\"Key:\", key)\n", "\n", "async_model = Model.create(\n", " model_name=key,\n", " primary_container=ContainerDefinition(\n", " image=image,\n", " model_data_url=model_url,\n", " ),\n", " execution_role_arn=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the `Endpoint` and wait for it to be `InService`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker_core.shapes import ProductionVariant, AsyncInferenceConfig, AsyncInferenceOutputConfig, AsyncInferenceClientConfig\n", "\n", "async_endpoint_config = EndpointConfig.create(\n", " endpoint_config_name=key,\n", " production_variants=[\n", " ProductionVariant(\n", " variant_name=\"variant1\",\n", " model_name=async_model,\n", " instance_type='ml.m5.xlarge',\n", " initial_instance_count=1\n", " )\n", " ],\n", " async_inference_config=AsyncInferenceConfig(\n", " output_config=AsyncInferenceOutputConfig(s3_output_path=async_s3_output_path),\n", " client_config=AsyncInferenceClientConfig(\n", " max_concurrent_invocations_per_instance=4\n", " )\n", " )\n", ")\n", "\n", "async_endpoint = Endpoint.create(endpoint_name=key, endpoint_config_name=async_endpoint_config)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async_endpoint.wait_for_status(\"InService\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload the Async Invoke Payload\n", "To invoke an endpoint asynchronously, we first must upload the request payload to S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def upload_file(input_location):\n", " prefix = f\"{bucket_prefix}/input\"\n", " return sagemaker_session.upload_data(\n", " input_location,\n", " bucket=sagemaker_session.default_bucket(),\n", " key_prefix=prefix,\n", " extra_args={\"ContentType\": \"text/libsvm\"},\n", " )\n", "\n", "input_path = \"input/test_point_0.libsvm\"\n", "input_s3_path = upload_file(input_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Endpoint Async Invoke\n", "\n", "Call `endpoint.invoke_async()` using the s3 path of the invoke request payload and store the \"OutputLocation\" of from the response." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = async_endpoint.invoke_async(input_location=input_s3_path)\n", "output_location = response[\"OutputLocation\"]\n", "print(output_location)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check the Output Location\n", "Check the output location from the `endpoint.invoke_async()` response to get the async inference results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import urllib, time\n", "from botocore.exceptions import ClientError\n", "\n", "\n", "def get_output(output_location):\n", " output_url = urllib.parse.urlparse(output_location)\n", " bucket = output_url.netloc\n", " key = output_url.path[1:]\n", " while True:\n", " try:\n", " return sagemaker_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])\n", " except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"NoSuchKey\":\n", " print(\"waiting for output...\")\n", " time.sleep(2)\n", " continue\n", " raise" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output = get_output(output_location)\n", "print(f\"Output: {output}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Delete All SageMaker Resources\n", "The following code block will call the delete() method for any SageMaker Core Resources created during the execution of this notebook which were assigned to local or global variables. If you created any additional deleteable resources without assigning the returning object to a unique variable, you will need to delete the resource manually by doing something like:\n", "\n", "```python\n", "resource = Resource.get(\"resource-name\")\n", "resource.delete()\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Delete any sagemaker core resource objects created in this notebook\n", "def delete_all_sagemaker_resources():\n", " all_objects = list(locals().values()) + list(globals().values())\n", " deletable_objects = [obj for obj in all_objects if hasattr(obj, 'delete') and obj.__class__.__module__ == 'sagemaker_core.main.resources']\n", " \n", " for obj in deletable_objects:\n", " obj.delete()\n", " \n", "delete_all_sagemaker_resources()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 4 }