sdk/python/featurestore_sample/notebooks/sdk_and_cli/3.Enable-recurrent-materialization-run-batch-inference.ipynb (630 lines of code) (raw):

{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "# Tutorial #3: Enable recurrent materialization and run batch inference" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "In this tutorial series you will experience how features seamlessly integrates all the phases of ML lifecycle: Prototyping features, training and operationalizing.\n", "\n", "In the previous part of the tutorial you learnt to experiment with features, train the model and register the model along with the feature-retrieval spec. In this tutorial you will learn how to run batch inference for the registered model.\n", "\n", "You will perform the following:\n", "- Enable recurrent materialization for the `transactions` feature set\n", "- Run batch inference pipeline on the registered model" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "# Prerequisites\n", "1. Please ensure you have executed the previous parts of this tutorial series" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "# Setup" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### (updated) Configure Azure ML spark notebook\n", "\n", "1. Running the tutorial: You can either create a new notebook, and execute the instructions in this document step by step or open the existing notebook named `Enable recurrent materialization and run batch inference`, and run it. The notebooks are available in `featurestore_sample/notebooks` directory. You can select from `sdk_only` or `sdk_and_cli`. You may keep this document open and refer to it for additional explanation and documentation links.\n", "1. In the \"Compute\" dropdown in the top nav, select \"Serverless Spark Compute\". \n", "1. Click on \"configure session\" in top status bar -> click on \"Python packages\" -> click on \"upload conda file\" -> select the file azureml-examples/sdk/python/featurestore-sample/project/env/conda.yml from your local machine; Also increase the session time out (idle time) if you want to avoid running the prerequisites frequently\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### Start spark session" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1697159743790 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "start-spark-session", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "# run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.\n", "print(\"start spark session\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### Setup root directory for the samples" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1697159769987 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "root-dir", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "import os\n", "\n", "# please update the dir to ./Users/<your_user_alias> (or any custom directory you uploaded the samples to).\n", "# You can find the name from the directory structure inm the left nav\n", "root_dir = \"./Users/<your_user_alias>/featurestore_sample\"\n", "\n", "if os.path.isdir(root_dir):\n", " print(\"The folder exists.\")\n", "else:\n", " print(\"The folder does not exist. Please create or fix the path\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### (new for sdk/cki track) Setup CLI\n", "\n", "1. Install azure ml cli extention\n", "1. Authenticate\n", "1. Set the default subscription" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "install-ml-ext-cli", "nteract": { "transient": { "deleting": false } }, "tags": [ "active-ipynb" ] }, "outputs": [], "source": [ "!az extension add --name ml" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "outputs_hidden": true, "source_hidden": false }, "name": "auth-cli", "nteract": { "transient": { "deleting": false } }, "tags": [ "active-ipynb" ] }, "outputs": [], "source": [ "!az login" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1697159883337 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "set-default-subs-cli", "nteract": { "transient": { "deleting": false } }, "tags": [ "active-ipynb" ] }, "outputs": [], "source": [ "import os\n", "\n", "subscription_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n", "\n", "!az account set -s $subscription_id" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### Initialize the project workspace CRUD client\n", "This is the current workspace where you will be running the tutorial notebook from" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1697159897063 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "init-ws-crud-client", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "### Initialize the MLClient of this project workspace\n", "import os\n", "from azure.ai.ml import MLClient\n", "from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n", "\n", "project_ws_sub_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n", "project_ws_rg = os.environ[\"AZUREML_ARM_RESOURCEGROUP\"]\n", "project_ws_name = os.environ[\"AZUREML_ARM_WORKSPACE_NAME\"]\n", "\n", "# connect to the project workspace\n", "ws_client = MLClient(\n", " AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### Initialize the feature store variables\n", "Ensure you update the `featurestore_name` to reflect what you created in part 1 of this tutorial" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1697159943735 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "init-fs-crud-client", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "featurestore_name = (\n", " \"<FEATURESTORE_NAME>\" # use the same name from part #1 of the tutorial\n", ")\n", "featurestore_subscription_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n", "featurestore_resource_group_name = os.environ[\"AZUREML_ARM_RESOURCEGROUP\"]" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### Initialize the feature store core sdk client" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1697159952751 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "init-fs-core-sdk", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "# feature store client\n", "from azureml.featurestore import FeatureStoreClient\n", "from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n", "\n", "featurestore = FeatureStoreClient(\n", " credential=AzureMLOnBehalfOfCredential(),\n", " subscription_id=featurestore_subscription_id,\n", " resource_group_name=featurestore_resource_group_name,\n", " name=featurestore_name,\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Step 1: Enable recurrent materialization on the `transactions` featureset\n", "\n", "In part 2 of this tutorial you enabled materialization and performed backfill on the transactions feature set. Backfill is an ondemand one-time operation to compute and store feature values in the materialization store. However when you want to perform inference of the model in production, you might want to keep the materilization store upto date by setting up recurrent materialization jobs. These jobs run on user defined schedule\n", "The recurrent job schedule works in the following way: \n", "- A window is defined by the interval and frequency. E.g., interval = 3 and frequency = Hour define a 3-hour window\n", "- The first window starts at the start_time defined in the RecurenceTrigger, and so on.\n", "- The first recurrent job will be submitted at the begining of the next window after the update time.\n", "- Later recurrent jobs will be submitted at every window after the first job.\n", "\n", "As explained in the previous parts of the tutorials, once data is materialized (backfill/recurrent materialization), feature retrieval will use the materialized data by default." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1697160005186 }, "jupyter": { "outputs_hidden": true, "source_hidden": false }, "name": "enable-recurrent-mat-txns-fset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "feature_set_schedule_yaml = (\n", " root_dir\n", " + \"/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml\"\n", ")\n", "\n", "!az ml feature-set update --file $feature_set_schedule_yaml --resource-group $featurestore_resource_group_name --feature-store-name $featurestore_name" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### Track status of the recurrent materialization jobs in the feature store studio UI\n", "This job will every three hours. \n", "\n", "__Action__:\n", "\n", "1. Feel free to execute the next step for now (batch inference).\n", "1. In three hours check the recurrent job status via the UI" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Step 2: Run the batch-inference pipeline\n", "\n", "In this step you will manually trigger the batch inference pipeline. In a production scenario, this could be trigerred by a ci/cd pipeline based on model registration/approval.\n", "\n", "The batch-inference has the following steps:\n", "\n", "1. Feature retrieval step: This use the same built-in feature retrieval component that we used in the training pipeline in the part 3 of the tutorial. Incase of training pipeline, we provided feature retreival spec as an input to the component. However in case of batch inference we will pass the registered model as the input and the component will look for feature retrieval spec in the model artifact. Another difference is that in case of training, the observation data had the target variable, however incase of batch inference it will not be present. The feature retrieval step will join the observation data with the features and output the data for batch inference.\n", "1. Batch inference: This step uses the batch inference input data from previous step, runs inference on the model and outputs the data by appending the predicted value.\n", "\n", "__Note:__ In this example we use a job for batch inference. You can also use Azure ML's batch endpoints." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1697160098403 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "run-batch-inf-pipeline", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "# set the batch inference pipeline path\n", "batch_inference_pipeline_path = (\n", " root_dir + \"/project/fraud_model/pipelines/batch_inference_pipeline.yaml\"\n", ")\n", "\n", "!az ml job create --file $batch_inference_pipeline_path --resource-group $project_ws_rg --workspace-name $project_ws_name" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### Inspect the batch inference output data\n", "1. In the pipeline view, double click on `inference_step` -> in `outputs` card, copy the `Data` field. It will be something like `azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1`. \n", "1. Paste it in the below cell with name and version separately (notice that the last character is the version, separated by a `:`).\n", "1. You will see the `predict_is_fraud` column generated by the batch inference pipeline\n", "\n", "Explanation: Since we did not provide a `name` and `version` in the `outputs` of the `inference_step` in the batch inference pipeline (`/project/fraud_mode/pipelines/batch_inference_pipeline.yaml`), the system created an untracked data asset with a guid as name and version as 1. In the next cell we will be getting the data path from the asset and displaying it." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1684255304980 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "inspect-batch-inf-output-data", "nteract": { "transient": { "deleting": false } }, "tags": [ "active-ipynb" ] }, "outputs": [], "source": [ "# inf_data_output = ws_client.data.get(name=\"azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction\", version=\"1\")\n", "inf_data_output = ws_client.data.get(\n", " name=\"azureml_0a7417c8-409a-4536-a069-4ea23a08ebfe_output_data_data_with_prediction\",\n", " version=\"1\",\n", ")\n", "inf_output_df = spark.read.parquet(inf_data_output.path + \"data/*.parquet\")\n", "display(inf_output_df.head(5))" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Cleanup\n", "If you created a resource group for the tutorial, you can delete the resource group to delete all the resources associated with this tutorial.\n", "\n", "Otherwise, you can delete the resources individually:\n", "\n", "* Delete the feature store: Go to the resource group in the azure portal, select the feature store and delete it\n", "* Follow the instructions [here](https://review.learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-manage-user-assigned-managed-identities?pivots=identity-mi-methods-azp&view=azureml-api-2#delete-a-user-assigned-managed-identity) to delete the user assigned managed identity\n", "* Delete the offline store (storage account): Go to the resource group in the azure portal, select the storage you created and delete it" ] } ], "metadata": { "celltoolbar": "Edit Metadata", "kernel_info": { "name": "synapse_pyspark" }, "kernelspec": { "display_name": "Synapse PySpark", "language": "Python", "name": "synapse_pyspark" }, "language_info": { "codemirror_mode": "ipython", "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython", "version": "3.8.0" }, "microsoft": { "host": { "AzureML": { "notebookHasBeenCompleted": true } }, "ms_spell_check": { "ms_spell_check_language": "en" } }, "nteract": { "version": "nteract-front-end@1.0.0" } }, "nbformat": 4, "nbformat_minor": 2 }