sdk/python/featurestore_sample/notebooks/sdk_and_cli/1.Develop-feature-set-and-register.ipynb (1,304 lines of code) (raw):
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"# Tutorial #1: Develop a feature set and register with managed feature store"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Azure ML managed feature store lets you discover, create and operationalize features. Features are the connective tissue in ML lifecycle, starting from prototyping where you experiment with various features to operationalization where models are deployed and feature data is looked up during inference. For information on basics concept of feature store, see [feature store concepts](https://learn.microsoft.com/azure/machine-learning/concept-what-is-managed-feature-store).\n",
"\n",
"In this tutorial series you will experience how features seamlessly integrates all the phases of ML lifecycle:\n",
"\n",
"This tutorial is the first part of a three part series. In this tutorial you will:\n",
"- Create a new minimal feature store resource\n",
"- Develop and test featureset locally with feature transformation capability\n",
"- Register a feature-store entity with the feature store\n",
"- Register the featureset that you developed with the feature store\n",
"- Generate sample training data dataframe using the features you created\n",
"- Enable offline materialization on the feature sets, and backfill the feature data\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Prerequisites\n",
"\n",
"> [!NOTE]\n",
"> This tutorial uses Azure Machine Learning notebook with **Serverless Spark Compute**.\n",
"\n",
"Before following the steps in this article, make sure you have the following prerequisites:\n",
"\n",
"* An Azure Machine Learning workspace. If you don't have one, use the steps in the [Quickstart: Create workspace resources](https://learn.microsoft.com/en-us/azure/machine-learning/quickstart-create-resources?view=azureml-api-2) article to create one.\n",
"* To perform the steps in this article, your user account must be assigned the owner or contributor role to a resource group where the feature store will be created"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### Note\n",
"This tutorial series has two tracks:\n",
"1. SDK only track: Uses only Python SDKs. This is suitable if pure Python-based development and deployment is preferred.\n",
"1. SDK & CLI track: Uses CLI for CRUD (create, read, update, and delete) operations and Python SDK for feature set development and testing only. This is useful in CI/CD or GitOps scenarios where CLI/YAML is preferred."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Set up\n",
"\n",
"This tutorial uses the Python feature store core SDK (`azureml-featurestore`) for feature set development and testing. The CLI is used for create, read, update, and delete (CRUD) operations, on feature stores, feature sets, and feature store entities.\n",
"\n",
"You don't need to explicitly install these resources for this tutorial, because in the set-up instructions shown here, the `conda.yml` file covers them.\n",
"\n",
"To prepare the notebook environment for development:\n",
"\n",
"1. In the Azure Machine Learning studio environment, select Notebooks on the left pane, and then select the Samples tab.\n",
"\n",
"1. Browse to the featurestore_sample directory (select **Samples** > **SDK v2** > **sdk** > **python** > **featurestore_sample**), and then select **Clone**.\n",
"\n",
"1. The **Select target directory** panel opens. Select the **Users** directory and then select _your user name_, and then select **Clone**.\n",
"\n",
"1. Run the tutorial\n",
"\n",
" * Option 1: Create a new notebook, and execute the instructions in this document, step by step.\n",
" * Option 2: Open existing notebook `featurestore_sample/notebooks/sdk_and_cli/1. Develop a feature set and register with managed feature store.ipynb`. You may keep this document open and refer to it for more explanation and documentation links.\n",
"\n",
"1. To configure the notebook environment, you must upload the conda.yml file\n",
"\n",
" 1. Select **Notebooks** on the left pane, and then select the **Files** tab.\n",
" 1. Browse to the *env* directory (select **Users** > *your_user_name* > **featurestore_sample** > **project** > **env**), and then select the conda.yml file.\n",
" 1. Select **Download**\n",
" 1. Select **Serverless Spark Compute** in the top navigation **Compute** dropdown. This operation might take one to two minutes. Wait for a status bar in the top to display **Configure session**.\n",
" 1. Select **Configure session** in the top status bar.\n",
" 1. Select **Python packages**.\n",
" 1. Select **Upload conda file**.\n",
" 1. Select the `conda.yml` you downloaded on your local device.\n",
" 1. (Optional) Increase the session time-out (idle time in minutes) to reduce the serverless spark cluster startup time.\n",
"\n",
"__Important:__ Except for this step, you need to run all the other steps every time you have a new spark session/session time out.\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Start spark session\n",
"Execute the following code cell to start the Spark session. It wil take approximately 10 minutes to install all dependencies and start the Spark session."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697155816751
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "start-spark-session",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": []
},
"outputs": [],
"source": [
"# Run this cell to start the spark session (any code block will start the session). This can take around 10 mins.\n",
"print(\"start spark session\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Setup root directory for the samples\n",
"This code cell sets up the root directory for the samples."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697155915577
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "root-dir",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import os\n",
"\n",
"# Please update your alias belpw (or any custom directory you uploaded the samples to).\n",
"# You can find the name from the directory structure in the left nav\n",
"root_dir = \"./Users/<your_user_alias>/featurestore_sample\"\n",
"\n",
"if os.path.isdir(root_dir):\n",
" print(\"The folder exists.\")\n",
"else:\n",
" print(\"The folder does not exist. Please create or fix the path\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Setup CLI\n",
"\n",
"1. Install AzureML CLI extention\n",
"1. Authenticate\n",
"1. Set the default subscription"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697155966146
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "install-ml-ext-cli",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"# Install AzureML CLI extension\n",
"!az extension add --name ml"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156018290
},
"jupyter": {
"outputs_hidden": true,
"source_hidden": false
},
"name": "auth-cli",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"# Authenticate\n",
"!az login"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156029362
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "set-default-subs-cli",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Set default subscription\n",
"import os\n",
"\n",
"subscription_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n",
"\n",
"!az account set -s $subscription_id"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Note\n",
"Feature store Vs Project workspace: You will use a feature store to reuse features across projects. You will use a project workspace (the current workspace) to train and inference models, by leveraging features from feature stores. Many project workspaces can share and reuse a same feature store.\n",
"\n",
"## Note\n",
"In this tutorial you will be using CLI and feature store core SDK:\n",
"\n",
"1. CLI: You will use CLI for CRUD (create, read, update, and delete) operations on feature store, feature set and feature store entities.\n",
"2. Feature store core SDK: This SDK (`azureml-featurestore`) is meant to be used for feature set development and consumption (you will learn more about these operations later):\n",
" - List/Get registered feature set\n",
" - Generate/resolve feature retrieval specification\n",
" - Execute featureset definition to generate Spark dataframe\n",
" - Generate training data using a point-in-time join\n",
"\n",
"For this tutorial so you do not need to install any of these explicitly, since the instructions already cover them (`conda.yml` in the above step include these)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Step 1: Create a minimal feature store"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 1a: Set feature store parameters\n",
"Set name, location and other values for the feature store."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156060740
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "fs-params",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# We use the subscription, resource group, region of this active project workspace.\n",
"# You can optionally replace them to create the resources in a different subsciprtion/resourceGroup, or use existing resources\n",
"import os\n",
"\n",
"featurestore_name = \"<FEATURESTORE_NAME>\"\n",
"featurestore_location = \"eastus\"\n",
"featurestore_subscription_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n",
"featurestore_resource_group_name = os.environ[\"AZUREML_ARM_RESOURCEGROUP\"]\n",
"\n",
"feature_store_arm_id = \"/subscriptions/{sub_id}/resourceGroups/{rg}/providers/Microsoft.MachineLearningServices/workspaces/{ws_name}\".format(\n",
" sub_id=featurestore_subscription_id,\n",
" rg=featurestore_resource_group_name,\n",
" ws_name=featurestore_name,\n",
")\n",
"\n",
"print(feature_store_arm_id)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 1b: Create the feature store"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1683415482376
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "create-fs-cli",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"!az ml feature-store create --subscription $featurestore_subscription_id --resource-group $featurestore_resource_group_name --location $featurestore_location --name $featurestore_name"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 1c: Initialize AzureML feature store core SDK client\n",
"As explained above, this is used to develop and consume features."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156242248
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "init-fs-core-sdk",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# feature store client\n",
"from azureml.featurestore import FeatureStoreClient\n",
"from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n",
"\n",
"featurestore = FeatureStoreClient(\n",
" credential=AzureMLOnBehalfOfCredential(),\n",
" subscription_id=featurestore_subscription_id,\n",
" resource_group_name=featurestore_resource_group_name,\n",
" name=featurestore_name,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Step 1d. Grant \"AzureML Data Scientist\" role on the feature store to your user identity"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"- Get your AAD object id from Azure portal following this instruction: https://learn.microsoft.com/en-us/partner-center/find-ids-and-domain-names#find-the-user-object-id\\\n",
"\n",
"Assign **AzureML Data Scientist** role to your user identity so that it can create resources in feature store workspace. Please note that it may take some time for the permissions to propagate."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "assign-aad-ds-role-cli"
},
"outputs": [],
"source": [
"your_aad_objectid = \"<USER_AAD_OBJECTID>\"\n",
"\n",
"!az role assignment create --role \"AzureML Data Scientist\" --assignee-object-id $your_aad_objectid --assignee-principal-type User --scope $feature_store_arm_id"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Step 2: Prototype and develop a transaction rolling aggregation feature set in this notebook"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 2a: Explore the transactions source data\n",
"\n",
"#### Note\n",
"The sample data used in this notebook is hosted in a public accessible blob container. It can only be read in Spark via `wasbs` driver. When you create feature sets using your own source data, please host them in ADLS Gen2 account and use `abfss` driver in the data path. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156282781
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "explore-txn-src-data",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Remove the \".\" in the roor directory path as we need to generate absolute path to read from Spark.\n",
"transactions_source_data_path = \"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet\"\n",
"transactions_src_df = spark.read.parquet(transactions_source_data_path)\n",
"\n",
"display(transactions_src_df.head(5))\n",
"# Note: display(training_df.head(5)) displays the timestamp column in a different format. You can can call transactions_src_df.show() to see correctly formatted value"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 2b: Develop a transactions feature set locally\n",
"\n",
"Feature set specification is a self-contained definition of feature set that can be developed and tested locally.\n",
"\n",
"Lets create the following rolling window aggregate features:\n",
"- transactions 3-day count\n",
"- transactions amount 3-day sum\n",
"- transactions amount 3-day avg\n",
"- transactions 7-day count\n",
"- transactions amount 7-day sum\n",
"- transactions amount 7-day avg\n",
"\n",
"__Action__:\n",
"- Inspect the feature transformation code file: `featurestore/featuresets/transactions/spec/transformation_code/transaction_transform.py`. You will see how is the rolling aggregation defined for the features. This is a Spark transformer.\n",
"\n",
"To understand the feature set and transformations in more detail, see [feature store concepts](https://learn.microsoft.com/azure/machine-learning/concept-what-is-managed-feature-store)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156302360
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "develop-txn-fset-locally",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azureml.featurestore import create_feature_set_spec, FeatureSetSpec\n",
"from azureml.featurestore.contracts import (\n",
" DateTimeOffset,\n",
" FeatureSource,\n",
" TransformationCode,\n",
" Column,\n",
" ColumnType,\n",
" SourceType,\n",
" TimestampColumn,\n",
")\n",
"\n",
"\n",
"transactions_featureset_code_path = (\n",
" root_dir + \"/featurestore/featuresets/transactions/transformation_code\"\n",
")\n",
"\n",
"transactions_featureset_spec = create_feature_set_spec(\n",
" source=FeatureSource(\n",
" type=SourceType.parquet,\n",
" path=\"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet\",\n",
" timestamp_column=TimestampColumn(name=\"timestamp\"),\n",
" source_delay=DateTimeOffset(days=0, hours=0, minutes=20),\n",
" ),\n",
" transformation_code=TransformationCode(\n",
" path=transactions_featureset_code_path,\n",
" transformer_class=\"transaction_transform.TransactionFeatureTransformer\",\n",
" ),\n",
" index_columns=[Column(name=\"accountID\", type=ColumnType.string)],\n",
" source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),\n",
" temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),\n",
" infer_schema=True,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"# Generate a spark dataframe from the feature set specification.\n",
"transactions_fset_df = transactions_featureset_spec.to_spark_dataframe()\n",
"# Display few records.\n",
"display(transactions_fset_df.head(5))"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 2c: Export as feature set specification\n",
"In order to register the feature set specification with the feature store, it needs to be saved in a specific format. \n",
"Action: Please inspect the generated `transactions` FeaturesetSpec: Open this file from the file tree to see the specification: `featurestore/featuresets/accounts/spec/FeaturesetSpec.yaml`\n",
"\n",
"Specification contains these important elements:\n",
"\n",
"1. `source`: Reference to a storage. In this case a parquet file in a blob storage.\n",
"1. `features`: List of features and their datatypes. If you provide transformation code (see Day 2 section), the code has to return a dataframe that maps to the features and datatypes.\n",
"1. `index_columns`: The join keys required to access values from the feature set\n",
"\n",
"Learn more about it in the [top level feature store entities document](https://learn.microsoft.com/azure/machine-learning/concept-top-level-entities-in-managed-feature-store) and the [feature set specification YAML reference](https://learn.microsoft.com/azure/machine-learning/reference-yaml-featureset-spec).\n",
"\n",
"The additional benefit of persisting the feature set specification is that it can be source controlled."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156318907
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "dump-transactions-fs-spec",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import os\n",
"\n",
"# Create a new folder to dump the feature set specification.\n",
"transactions_featureset_spec_folder = (\n",
" root_dir + \"/featurestore/featuresets/transactions/spec\"\n",
")\n",
"\n",
"# Check if the folder exists, create one if it does not exist.\n",
"if not os.path.exists(transactions_featureset_spec_folder):\n",
" os.makedirs(transactions_featureset_spec_folder)\n",
"\n",
"transactions_featureset_spec.dump(transactions_featureset_spec_folder, overwrite=True)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Step 3: Register a feature store entity\n",
"Entity helps enforce best practice that same join key definitions are used across feature sets which uses the same logical entities. Examples of entities are account entity, customer entity etc. Entities are typically created once and reused across feature-sets. For information on basics concept of feature store, see [feature store concepts](https://learn.microsoft.com/azure/machine-learning/concept-what-is-managed-feature-store)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697157524651
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "register-acct-entity-cli",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"account_entity_path = root_dir + \"/featurestore/entities/account.yaml\"\n",
"!az ml feature-store-entity create --file $account_entity_path --resource-group $featurestore_resource_group_name --feature-store-name $featurestore_name"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Step 4: Register the transaction feature set with the feature store\n",
"You register a feature set asset with the feature store so that you can share and reuse with others. You also get managed capabilities like versioning and materialization (we will learn in this tutorial series).\n",
"\n",
"The feature set asset has reference to the feature set spec that you created earlier and additional properties like version and materialization settings."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697157498133
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "register-txn-fset-cli",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"account_featureset_path = (\n",
" root_dir + \"/featurestore/featuresets/transactions/featureset_asset.yaml\"\n",
")\n",
"!az ml feature-set create --file $account_featureset_path --resource-group $featurestore_resource_group_name --feature-store-name $featurestore_name"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Explore the feature store UI\n",
"* Goto the [Azure ML global landing page](https://ml.azure.com/home).\n",
"* Click on **Feature stores** in the left navigation.\n",
"* You will see the list of feature stores that you have access to. Click on the feature store that you have created above.\n",
"\n",
"You can see the feature sets and entities that you have created.\n",
"\n",
"Note: Creating and updating feature store assets (feature sets and entities) is possible only through SDK and CLI. You can use the UI to search/browse the feature store."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Grant \"Storage Blob Data Reader\" role on the offline store to your user identity\n",
"If feature data is materialized, then you need this role to read feature data from offline materialization store.\n",
"- Get information about the offline materialization store from the Feature Store **Overview** page in the Feature Store UI. The storage account subscription ID, storage account resource group name, and storage account name for offline materialization store can be found on **Offline materialization store** card. \n",
" \n",
"\n",
"To learn more about access control, see access control document in the docs.\n",
"\n",
"Execute the following code cell for role assignment. Please note that it may take some time for permissions to propagate. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "grant-rbac-to-user-identity-cli"
},
"outputs": [],
"source": [
"storage_subscription_id = \"<SUBSCRIPTION_ID>\"\n",
"storage_resource_group_name = \"<RESOURCE_GROUP>\"\n",
"storage_account_name = \"<STORAGE_ACCOUNT_NAME>\"\n",
"\n",
"# Set the ADLS Gen2 storage account ARM ID\n",
"gen2_storage_arm_id = \"/subscriptions/{sub_id}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}\".format(\n",
" sub_id=storage_subscription_id,\n",
" rg=storage_resource_group_name,\n",
" account=storage_account_name,\n",
")\n",
"\n",
"print(gen2_storage_arm_id)\n",
"\n",
"!az role assignment create --role \"Storage Blob Data Reader\" --assignee-object-id $your_aad_objectid --assignee-principal-type User --scope $gen2_storage_arm_id"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Step 5: Generate a training data dataframe using the registered features"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 5a: Load observation data\n",
"\n",
"We start by exploring the observation data. Observation data is typically the core data used in training and inference data. This is then joined with feature data to create the full training data. Observation data is the data captured during the time of the event: in this case it has core transaction data including transaction ID, account ID, transaction amount. In this case, since it is for training, it also has the target variable appended (`is_fraud`).\n",
"\n",
"To learn more core concepts including observation data, refer to the feature store documentation."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156393056
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "load-obs-data",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"observation_data_path = \"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/train/*.parquet\"\n",
"observation_data_df = spark.read.parquet(observation_data_path)\n",
"obs_data_timestamp_column = \"timestamp\"\n",
"\n",
"display(observation_data_df)\n",
"# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 5b: Get the registered feature set and list its features"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156408060
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "get-txn-fset",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Look up the featureset by providing a name and a version.\n",
"transactions_featureset = featurestore.feature_sets.get(\"transactions\", \"1\")\n",
"# List its features.\n",
"transactions_featureset.features"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156423972
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "print-txn-fset-sample-values",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"# Print sample values.\n",
"display(transactions_featureset.to_spark_dataframe().head(5))"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"#### Step 5c: Select features and generate training data\n",
"In this step we will select features that we would like to be part of training data and use the feature store SDK to generate the training data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697156450018
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "select-features-and-gen-training-data",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azureml.featurestore import get_offline_features\n",
"\n",
"# You can select features in pythonic way.\n",
"features = [\n",
" transactions_featureset.get_feature(\"transaction_amount_7d_sum\"),\n",
" transactions_featureset.get_feature(\"transaction_amount_7d_avg\"),\n",
"]\n",
"\n",
"# You can also specify features in string form: featureset:version:feature.\n",
"more_features = [\n",
" \"transactions:1:transaction_3d_count\",\n",
" \"transactions:1:transaction_amount_3d_avg\",\n",
"]\n",
"\n",
"more_features = featurestore.resolve_feature_uri(more_features)\n",
"features.extend(more_features)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"# Generate training dataframe by using feature data and observation data.\n",
"training_df = get_offline_features(\n",
" features=features,\n",
" observation_data=observation_data_df,\n",
" timestamp_column=obs_data_timestamp_column,\n",
")\n",
"\n",
"# Ignore the message that says feature set is not materialized (materialization is optional). We will enable materialization in the next part of the tutorial.\n",
"display(training_df)\n",
"# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"You can see how the features are appended to the training data using a point-in-time join."
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Step 6: Enable offline materialization on transactions feature set\n",
"Once materialization is enabled on a feature set, you can perform backfill (this tutorial) or schedule recurrent materialization jobs (shown in a later tutorial).\n",
"\n",
"#### Set spark.sql.shuffle.partitions in the yaml file according to the feature data size\n",
"\n",
"The spark configuration `spark.sql.shuffle.partitions` is an OPTIONAL parameter that can affect the number of parquet files generated (per day) when the feature set is materialized into the offline store. The default value of this parameter is 200. The best practice is to avoid generating many small parquet files. If offline feature retrieval turns out to become slow after the feature set is materialized, please go to the corresponding folder in the offline store to check whether it is the issue of having too many small parquet files (per day), and adjust the value of this parameter accordingly.\n",
"\n",
"*Note: The sample data used in this notebook is small. So this parameter is set to 1 in the featureset_asset_offline_enabled.yaml file.*"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697157176939
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "enable-offline-mat-txns-fset-cli",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"transaction_asset_mat_yaml = (\n",
" root_dir\n",
" + \"/featurestore/featuresets/transactions/featureset_asset_offline_enabled.yaml\"\n",
")\n",
"\n",
"!az ml feature-set update --file $transaction_asset_mat_yaml --resource-group $featurestore_resource_group_name --feature-store-name $featurestore_name"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Step 7: Backfill data for transactions feature set\n",
"Materialization is the process of computing the feature values for a given feature window and storing this in an materialization store. Materializing the features will increase its reliability and availability. All feature queries will use the materialized values from the materialization store. In this step you perform a one-time backfill for a feature window of __18 months__.\n",
"\n",
"The following code cell will materialize data by current status *None* or *Incomplete* for the defined feature window. \n",
"\n",
"#### Note\n",
"How to determine the window of backfill data needed? It has to match with the window of your training data. For e.g. if you want to train with two years of data, then you will want to be able to retrieve features for the same window, so you will backfill for a two year window."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697157219809
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "backfill-txns-fset-cli",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"feature_window_start_time = \"2022-01-01T00:00.000Z\"\n",
"feature_window_end_time = \"2023-06-30T00:00.000Z\"\n",
"\n",
"!az ml feature-set backfill --name transactions --version 1 --by-data-status \"['None', 'Incomplete']\" --feature-window-start-time $feature_window_start_time --feature-window-end-time $feature_window_end_time --feature-store-name $featurestore_name --resource-group $featurestore_resource_group_name"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"Let's print sample data from the feature set. You can notice from the output information that the data was retrieved from the materilization store. `get_offline_features()` method that is used to retrieve training/inference data will also use the materialization store by default."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1697157361591
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "sample-txns-fset-data-cli",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"# Look up the feature set by providing a name and a version and display few records.\n",
"transactions_featureset = featurestore.feature_sets.get(\"transactions\", \"1\")\n",
"display(transactions_featureset.to_spark_dataframe().head(5))"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Cleanup\n",
"\n",
"Tutorial `3. Enable recurrent materialization and run batch inference` has instructions deleting the resources."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Next steps\n",
"* Experiment and train models using features"
]
}
],
"metadata": {
"celltoolbar": "Edit Metadata",
"kernel_info": {
"name": "synapse_pyspark"
},
"kernelspec": {
"display_name": "Synapse PySpark",
"language": "Python",
"name": "synapse_pyspark"
},
"language_info": {
"codemirror_mode": "ipython",
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython",
"version": "3.8.0"
},
"microsoft": {
"host": {
"AzureML": {
"notebookHasBeenCompleted": true
}
},
"ms_spell_check": {
"ms_ignore_dictionary": [
"dataframe",
"featureset",
"operationalization",
"operationalize"
],
"ms_spell_check_language": "en"
}
},
"nteract": {
"version": "nteract-front-end@1.0.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}