sdk/python/featurestore_sample/notebooks/sdk_only/7.Develop-feature-set-domain-specific-language-dsl.ipynb (1,650 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "# Tutorial #7: Develop a feature set using Domain Specific Language (preview)\n", "\n", "Domain Specific Language (DSL) for the managed feature store provides a simple and user-friendly way of defining the most commonly used feature aggregations. The feature store SDK allows users to perform most commonly used aggregations by using a DSL *expression*. These aggregations ensure consistent results when compared with user-defined functions (UDFs) without the overhead of writing UDFs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Important\n", "\n", "This feature is currently in public preview. This preview version is provided without a service-level agreement, and it's not recommended for production workloads. Certain features might not be supported or might have constrained capabilities. For more information, see [Supplemental Terms of Use for Microsoft Azure Previews](https://azure.microsoft.com/support/legal/preview-supplemental-terms/)." ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "In this tutorial you will:\n", "- Create a new minimal feature store workspace.\n", "- Develop and test feature set locally by using Domain Specific Language (DSL).\n", "- Develop a feature set by using User Defined Functions (UDFs) that perform the same transformations as the feature set created using DSL.\n", "- Compare results from the feature sets created using DSL and UDFs.\n", "- Register a feature store entity with the feature store.\n", "- Register the feature set created using DSL with the feature store.\n", "- Generate sample training data using the created features." ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Prerequisites\n", "\n", "> [!NOTE]\n", "> This tutorial uses Azure Machine Learning notebook with **Serverless Spark Compute**.\n", "\n", "Before following the steps in this article, make sure you have the following prerequisites:\n", "\n", "1. An Azure Machine Learning workspace. If you don't have one, use the steps in the [Quickstart: Create workspace resources](https://learn.microsoft.com/azure/machine-learning/quickstart-create-resources?view=azureml-api-2) article to create one.\n", "1. To perform the steps in this article, your user account must be assigned the **Owner** or **Contributor** role to the resource group where the feature store will be created." ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "#### Prepare the notebook environment for development\n", "\n", "This tutorial uses the Python feature store core SDK (`azureml-featurestore`). The SDK is used for create, read, update, and delete (CRUD) operations, on feature stores, feature sets, and feature store entities.\n", "\n", "You don't need to explicitly install these resources for this tutorial, because in the set-up instructions shown here, the `conda.yml` file covers them.\n", "\n", "To prepare the notebook environment for development:\n", "\n", "1. In the Azure Machine Learning studio environment, select Notebooks on the left pane, and then select the Samples tab.\n", "\n", "1. Navigate to the `featurestore_sample` directory (select **Samples** > **SDK v2** > **sdk** > **python** > **featurestore_sample**), and then select **Clone**.\n", "\n", "1. The **Select target directory** panel opens. Select the **Users** directory, select _your user name_, and then select **Clone**.\n", "\n", "1. Run the tutorial\n", "\n", " * Option 1: Create a new notebook, and execute the instructions in this document, step by step.\n", " * Option 2: Open existing notebook `featurestore_sample/notebooks/sdk_only/7. Develop a feature set using Domain Specific Language (DSL).ipynb`. You may keep this document open and refer to it for more explanation and documentation links.\n", "\n", "2. To configure the notebook environment, you must upload the `conda.yml` file\n", "\n", " 1. Select **Notebooks** on the left navigation panel, and then select the **Files** tab.\n", " 2. Navigate to the `env` directory (select **Users** > *your_user_name* > **featurestore_sample** > **project** > **env**), and then select the `conda.yml` file.\n", " 3. Select **Download**\n", " 4. Select **Serverless Spark Compute** in the top navigation **Compute** dropdown. This operation might take one to two minutes. Wait for the status bar in the top to display **Configure session** link.\n", " 5. Select **Configure session** in the top status bar.\n", " 6. Select **Settings**.\n", " 7. Select **Apache Spark version** as `Spark version 3.3`.\n", " 8. Optionally, increase the **Session timeout** (idle time) if you want to avoid frquently restarting the serverless Spark session.\n", " 5. Under **Configuration settings**, define *Property* `spark.jars.packages` and *Value* `com.microsoft.azure:azureml-fs-scala-impl:1.0.4`.\n", " ![DSL_SPARK_JARS_PROPERTY](./images/dsl-spark-jars-property.png) \n", " 10. Select **Python packages**.\n", " 11. Select **Upload conda file**.\n", " 12. Select the `conda.yml` you downloaded on your local device.\n", " 13. Select **Apply**.\n", "\n", "__Important:__ Except for this step, you need to run all the other steps every time you start a new spark session or after session time out." ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Setup root directory for the samples\n", "This code cell sets up the root directory for the samples. It may take 10 minutes or more to execute this cell as it also installs all Conda dependencies and starts the Spark session." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1709234440650 }, "name": "setup-root-dir" }, "outputs": [], "source": [ "import os\n", "\n", "# Please update your alias USER_NAME below (or any custom directory you uploaded the samples to).\n", "# You can find the name from the directory structure in the left nav\n", "root_dir = \"./Users/USER_NAME/featurestore_sample\"\n", "\n", "if os.path.isdir(root_dir):\n", " print(\"The folder exists.\")\n", "else:\n", " print(\"The folder does not exist. Please create or fix the path\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a minimal feature store" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Create a feature store in a region of your choice from the Azure Machine Learning Studio UI or using AzureML Python SDK code. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Option 1. Create feature store from the Azure Machine Learning Studio UI\n", "\n", "1. Navigate to the feature store UI [landing page](https://ml.azure.com/featureStores).\n", "2. Select **+ Create**.\n", "3. On the **Basics** tab:\n", " 1. Choose a **Name** for your feature store.\n", " 2. Select the **Subscription**. \n", " 3. Select the **Resource group**.\n", " 4. Select the **Region**.\n", " 5. Select **Apache Spark version** 3.3. then select **Next**.\n", "4. On the **Materialization** tab:\n", " 1. Toggle **Enable materialization**.\n", " 2. Select **Subscription** and **User identity** to **Assign user managed identity**.\n", " 3. Select **From Azure subscription** under **Offline store**.\n", " 4. Select **Store name** and **Azure Data Lake Gen2 file system name**, then select **Next**.\n", "5. On the **Review** tab, verify the displayed information and then select **Create**." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Option 2. Create feature store from the Python SDK\n", "Provide `featurestore_name`, `featurestore_resource_group_name`, and `featurestore_subscription_id` and execute the following cell to create a minimal feature store." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709234999267 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "create-min-fs", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "import os\n", "\n", "featurestore_name = \"<FEATURE_STORE_NAME>\"\n", "featurestore_resource_group_name = \"<RESOURCE_GROUP>\"\n", "featurestore_subscription_id = \"<SUBSCRIPTION_ID>\"\n", "\n", "##### Create Feature Store #####\n", "from azure.ai.ml import MLClient\n", "from azure.ai.ml.entities import (\n", " FeatureStore,\n", " FeatureStoreEntity,\n", " FeatureSet,\n", ")\n", "from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n", "\n", "ml_client = MLClient(\n", " AzureMLOnBehalfOfCredential(),\n", " subscription_id=featurestore_subscription_id,\n", " resource_group_name=featurestore_resource_group_name,\n", ")\n", "featurestore_location = \"eastus\"\n", "\n", "fs = FeatureStore(name=featurestore_name, location=featurestore_location)\n", "# wait for featurestore creation\n", "fs_poller = ml_client.feature_stores.begin_create(fs)\n", "print(fs_poller.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Assign permissions to your user identity on the offline store\n", "If feature data is materialized, then you need to assign **Storage Blob Data Reader** role to your user identity to read feature data from offline materialization store.\n", "- Open the [Azure ML global landing page](https://ml.azure.com/home).\n", "- Select **Feature stores** in the left navigation.\n", "- You will see the list of feature stores that you have access to. Select the feature store that you have created above.\n", "- Select storage account link under **Account name** on **Offline materialization store** card to navigate to ADLS Gen2 storage account for offline store. \n", "![OFFLINE_STORE_LINK](./images/offline-store-link.png)\n", "- Follow [this documentation](https://learn.microsoft.com/azure/role-based-access-control/role-assignments-portal) to assign **Storage Blob Data Reader** role to your user identity on the ADLS Gen2 storage account for offline store. Allow some time for permissions to propagate." ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Create a feature set specification using DSL expressions\n", "Execute the following code cell to create a feature set specification using parquet files as source data and DSL expressions. Currently, the following aggregation expressions are supported:\n", "- Average - `avg`\n", "- Sum - `sum`\n", "- Minimum - `min`\n", "- Maximum - `max`\n", "- Count - `count`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235453982 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "create-dsl-parq-fset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from azureml.featurestore import create_feature_set_spec\n", "from azureml.featurestore.contracts.feature import Feature\n", "from azureml.featurestore.transformation import (\n", " TransformationExpressionCollection,\n", " WindowAggregation,\n", ")\n", "from azureml.featurestore.contracts import (\n", " DateTimeOffset,\n", " TransformationCode,\n", " Column,\n", " ColumnType,\n", " SourceType,\n", " TimestampColumn,\n", ")\n", "from azureml.featurestore.feature_source import ParquetFeatureSource\n", "\n", "dsl_feature_set_spec = create_feature_set_spec(\n", " source=ParquetFeatureSource(\n", " path=\"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet\",\n", " timestamp_column=TimestampColumn(name=\"timestamp\"),\n", " source_delay=DateTimeOffset(days=0, hours=0, minutes=20),\n", " ),\n", " index_columns=[Column(name=\"accountID\", type=ColumnType.string)],\n", " features=[\n", " Feature(name=\"f_transaction_3d_count\", type=ColumnType.Integer),\n", " Feature(name=\"f_transaction_amount_3d_sum\", type=ColumnType.DOUBLE),\n", " Feature(name=\"f_transaction_amount_3d_avg\", type=ColumnType.DOUBLE),\n", " Feature(name=\"f_transaction_7d_count\", type=ColumnType.Integer),\n", " Feature(name=\"f_transaction_amount_7d_sum\", type=ColumnType.DOUBLE),\n", " Feature(name=\"f_transaction_amount_7d_avg\", type=ColumnType.DOUBLE),\n", " ],\n", " feature_transformation=TransformationExpressionCollection(\n", " transformation_expressions=[\n", " WindowAggregation(\n", " feature_name=\"f_transaction_3d_count\",\n", " aggregation=\"count\",\n", " window=DateTimeOffset(days=3),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_amount_3d_sum\",\n", " source_column=\"transactionAmount\",\n", " aggregation=\"sum\",\n", " window=DateTimeOffset(days=3),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_amount_3d_avg\",\n", " source_column=\"transactionAmount\",\n", " aggregation=\"avg\",\n", " window=DateTimeOffset(days=3),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_7d_count\",\n", " aggregation=\"count\",\n", " window=DateTimeOffset(days=7),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_amount_7d_sum\",\n", " source_column=\"transactionAmount\",\n", " aggregation=\"sum\",\n", " window=DateTimeOffset(days=7),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_amount_7d_avg\",\n", " source_column=\"transactionAmount\",\n", " aggregation=\"avg\",\n", " window=DateTimeOffset(days=7),\n", " ),\n", " ]\n", " ),\n", ")\n", "\n", "dsl_feature_set_spec" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following code cell defines start and end time for feature window." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235455234 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "define-feat-win", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from datetime import datetime\n", "\n", "st = datetime(2020, 1, 1)\n", "et = datetime(2023, 6, 1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use `to_spark_dataframe()` to get a dataframe in the defined feature window from the above feature set specification defined using DSL expressions." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235504320 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "sparkdf-dsl-parq", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "dsl_df = dsl_feature_set_spec.to_spark_dataframe(\n", " feature_window_start_date_time=st, feature_window_end_date_time=et\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Print some sample feature values from the feature set defined using DSL expressions." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235699953 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "display-dsl-parq", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "display(dsl_df)" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## [Optional] Create a feature set specification with custom source using DSL expressions\n", "Execute the following code cell to create a feature set specification using custom source and supported DSL expressions." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235546455 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "create-dsl-cust-fset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from azureml.featurestore import create_feature_set_spec\n", "from azureml.featurestore.contracts.feature import Feature\n", "from azureml.featurestore.feature_source import CustomFeatureSource\n", "from azureml.featurestore.transformation import (\n", " TransformationExpressionCollection,\n", " WindowAggregation,\n", ")\n", "from azureml.featurestore.contracts import (\n", " DateTimeOffset,\n", " SourceProcessCode,\n", " TransformationCode,\n", " Column,\n", " ColumnType,\n", " SourceType,\n", " TimestampColumn,\n", ")\n", "\n", "transactions_source_process_code_path = (\n", " root_dir\n", " + \"/featurestore/featuresets/transactions_custom_source/source_process_code\"\n", ")\n", "\n", "dsl_custom_feature_set_spec = create_feature_set_spec(\n", " source=CustomFeatureSource(\n", " kwargs={\n", " \"source_path\": \"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json\",\n", " \"timestamp_column_name\": \"timestamp\",\n", " },\n", " timestamp_column=TimestampColumn(name=\"timestamp\"),\n", " source_delay=DateTimeOffset(days=0, hours=0, minutes=20),\n", " source_process_code=SourceProcessCode(\n", " path=transactions_source_process_code_path,\n", " process_class=\"source_process.CustomSourceTransformer\",\n", " ),\n", " ),\n", " index_columns=[Column(name=\"accountID\", type=ColumnType.string)],\n", " features=[\n", " Feature(name=\"f_transaction_3d_count\", type=ColumnType.Integer),\n", " Feature(name=\"f_transaction_amount_3d_sum\", type=ColumnType.DOUBLE),\n", " Feature(name=\"f_transaction_amount_3d_avg\", type=ColumnType.DOUBLE),\n", " Feature(name=\"f_transaction_7d_count\", type=ColumnType.Integer),\n", " Feature(name=\"f_transaction_amount_7d_sum\", type=ColumnType.DOUBLE),\n", " Feature(name=\"f_transaction_amount_7d_avg\", type=ColumnType.DOUBLE),\n", " ],\n", " feature_transformation=TransformationExpressionCollection(\n", " transformation_expressions=[\n", " WindowAggregation(\n", " feature_name=\"f_transaction_3d_count\",\n", " aggregation=\"count\",\n", " window=DateTimeOffset(days=3),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_amount_3d_sum\",\n", " source_column=\"transactionAmount\",\n", " aggregation=\"sum\",\n", " window=DateTimeOffset(days=3),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_amount_3d_avg\",\n", " source_column=\"transactionAmount\",\n", " aggregation=\"avg\",\n", " window=DateTimeOffset(days=3),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_7d_count\",\n", " aggregation=\"count\",\n", " window=DateTimeOffset(days=7),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_amount_7d_sum\",\n", " source_column=\"transactionAmount\",\n", " aggregation=\"sum\",\n", " window=DateTimeOffset(days=7),\n", " ),\n", " WindowAggregation(\n", " feature_name=\"f_transaction_amount_7d_avg\",\n", " source_column=\"transactionAmount\",\n", " aggregation=\"avg\",\n", " window=DateTimeOffset(days=7),\n", " ),\n", " ]\n", " ),\n", ")\n", "\n", "dsl_custom_feature_set_spec" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use `to_spark_dataframe()` to get a dataframe in the defined feature window from the above feature set specification defined using custom source and DSL expressions." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235571054 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "sparkdf-dsl-cust", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "custom_source_dsl_df = dsl_custom_feature_set_spec.to_spark_dataframe(\n", " feature_window_start_date_time=st, feature_window_end_date_time=et\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, print some sample feature values from the feature set defined using custom source and DSL expressions." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235669873 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "display-dsl-cust", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "display(custom_source_dsl_df)" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Create a feature set specification using UDF\n", "Now, create a feature set specification that performs the same transformations as DSL by using UDF." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235599244 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "create-udf-parq-fset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from azureml.featurestore import create_feature_set_spec\n", "from azureml.featurestore.contracts import (\n", " DateTimeOffset,\n", " TransformationCode,\n", " Column,\n", " ColumnType,\n", " SourceType,\n", " TimestampColumn,\n", ")\n", "from azureml.featurestore.feature_source import ParquetFeatureSource\n", "\n", "transactions_featureset_code_path = (\n", " root_dir + \"/featurestore/featuresets/transactions/transformation_code\"\n", ")\n", "\n", "udf_feature_set_spec = create_feature_set_spec(\n", " source=ParquetFeatureSource(\n", " path=\"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet\",\n", " timestamp_column=TimestampColumn(name=\"timestamp\"),\n", " source_delay=DateTimeOffset(days=0, hours=0, minutes=20),\n", " ),\n", " transformation_code=TransformationCode(\n", " path=transactions_featureset_code_path,\n", " transformer_class=\"transaction_transform.TransactionFeatureTransformer\",\n", " ),\n", " index_columns=[Column(name=\"accountID\", type=ColumnType.string)],\n", " infer_schema=True,\n", ")\n", "\n", "udf_feature_set_spec" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "The transformation code is provided here to show that the UDF defines the same transformations as the DSL expressions." ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "```python\n", "class TransactionFeatureTransformer(Transformer):\n", " def _transform(self, df: DataFrame) -> DataFrame:\n", " days = lambda i: i * 86400\n", " w_3d = (\n", " Window.partitionBy(\"accountID\")\n", " .orderBy(F.col(\"timestamp\").cast(\"long\"))\n", " .rangeBetween(-days(3), 0)\n", " )\n", " w_7d = (\n", " Window.partitionBy(\"accountID\")\n", " .orderBy(F.col(\"timestamp\").cast(\"long\"))\n", " .rangeBetween(-days(7), 0)\n", " )\n", " res = (\n", " df.withColumn(\"transaction_7d_count\", F.count(\"transactionID\").over(w_7d))\n", " .withColumn(\n", " \"transaction_amount_7d_sum\", F.sum(\"transactionAmount\").over(w_7d)\n", " )\n", " .withColumn(\n", " \"transaction_amount_7d_avg\", F.avg(\"transactionAmount\").over(w_7d)\n", " )\n", " .withColumn(\"transaction_3d_count\", F.count(\"transactionID\").over(w_3d))\n", " .withColumn(\n", " \"transaction_amount_3d_sum\", F.sum(\"transactionAmount\").over(w_3d)\n", " )\n", " .withColumn(\n", " \"transaction_amount_3d_avg\", F.avg(\"transactionAmount\").over(w_3d)\n", " )\n", " .select(\n", " \"accountID\",\n", " \"timestamp\",\n", " \"transaction_3d_count\",\n", " \"transaction_amount_3d_sum\",\n", " \"transaction_amount_3d_avg\",\n", " \"transaction_7d_count\",\n", " \"transaction_amount_7d_sum\",\n", " \"transaction_amount_7d_avg\",\n", " )\n", " )\n", " return res\n", "\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use `to_spark_dataframe()` to get a dataframe from the above feature set specification defined using UDF." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235617470 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "sparkdf-udf-parq", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "udf_df = udf_feature_set_spec.to_spark_dataframe(\n", " feature_window_start_date_time=st, feature_window_end_date_time=et\n", ")" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Compare the results and verify consistency between the results from DSL expressions and transformations performed using UDF. To verify, select one of the `accountID` values to compare the values in the two dataframes." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235627238 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "display-dsl-acct", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "display(dsl_df.where(dsl_df.accountID == \"A1899946977632390\").sort(\"timestamp\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235641198 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "display-udf-acct", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "display(udf_df.where(udf_df.accountID == \"A1899946977632390\").sort(\"timestamp\"))" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "### Export as feature set specification\n", "In order to register the feature set specification with the feature store, it needs to be saved in a specific format. \n", "Action: Please inspect the generated `transactions-dsl` feature set specification: Open this file from the file tree to see the specification: `featurestore/featuresets/transactions-dsl/spec/FeaturesetSpec.yaml`\n", "\n", "The feature set specification contains these important elements:\n", "\n", "1. `source`: Reference to a storage. In this case a parquet file in a blob storage.\n", "2. `features`: List of features and their datatypes. If you provide transformation code, the code has to return a dataframe that maps to the features and data types.\n", "3. `index_columns`: The join keys required to access values from the feature set\n", "\n", "Learn more about it in the [top level feature store entities document](https://learn.microsoft.com/azure/machine-learning/concept-top-level-entities-in-managed-feature-store) and the [feature set specification YAML reference](https://learn.microsoft.com/azure/machine-learning/reference-yaml-featureset-spec).\n", "\n", "The additional benefit of persisting the feature set specification is that it can be source controlled." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Execute the following code cell to write YAML specification file for the feature set using parquet data source and DSL expressions. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235728242 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "dump-dsl-parq-fset-spec", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "dsl_spec_folder = root_dir + \"/featurestore/featuresets/transactions-dsl/spec\"\n", "\n", "dsl_feature_set_spec.dump(dsl_spec_folder, overwrite=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Execute the following code cell to write YAML specification file for the feature set using custom source and DSL expressions. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235731389 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "dump-dsl-custom-fset-spec", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "custom_source_dsl_spec_folder = (\n", " root_dir + \"/featurestore/featuresets/transactions-dsl/custom_source_spec\"\n", ")\n", "\n", "dsl_custom_feature_set_spec.dump(custom_source_dsl_spec_folder, overwrite=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Execute the following code cell to write YAML specification file for the feature set using UDF. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235734382 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "dump-udf-parq-fset-spec", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "udf_spec_folder = root_dir + \"/featurestore/featuresets/transactions-udf/spec\"\n", "\n", "udf_feature_set_spec.dump(udf_spec_folder, overwrite=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialize feature store workspace CRUD client and feature store core SDK client \n", "You will be using two SDKs:\n", "\n", "1. Feature store CRUD SDK: You will use the AzureML SDK `MLClient` (package name `azure-ai-ml`), similar to the one you use with Azure ML workspace. This will be used for feature store CRUD operations (create, read, update, and delete) for feature store, feature set and feature store entities. This is because feature store is implemented as a type of workspace.\n", "2. Feature store core SDK: This SDK (`azureml-featurestore`) is meant to be used for feature set development and consumption." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235739851 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "init-python-clients", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from azure.ai.ml import MLClient\n", "from azure.ai.ml.entities import (\n", " FeatureStore,\n", " FeatureStoreEntity,\n", " FeatureSet,\n", ")\n", "from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n", "from azureml.featurestore import FeatureStoreClient\n", "\n", "fs_client = MLClient(\n", " AzureMLOnBehalfOfCredential(),\n", " featurestore_subscription_id,\n", " featurestore_resource_group_name,\n", " featurestore_name,\n", ")\n", "\n", "featurestore = FeatureStoreClient(\n", " credential=AzureMLOnBehalfOfCredential(),\n", " subscription_id=featurestore_subscription_id,\n", " resource_group_name=featurestore_resource_group_name,\n", " name=featurestore_name,\n", ")" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Register `account` entity with the feature store\n", "Create account entity that has join key `accountID` of `string` type. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235748177 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "register-account-entity", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from azure.ai.ml.entities import DataColumn\n", "\n", "account_entity_config = FeatureStoreEntity(\n", " name=\"account\",\n", " version=\"1\",\n", " index_columns=[DataColumn(name=\"accountID\", type=\"string\")],\n", ")\n", "\n", "poller = fs_client.feature_store_entities.begin_create_or_update(account_entity_config)\n", "print(poller.result())" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Register the feature set with the feature store using the exported feature set specification\n", "Register the `transactions-dsl` feature set (that uses DSL) with the feature store with offline materialization enabled using the exported feature set specification." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235794359 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "register-dsl-trans-fset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from azure.ai.ml.entities import (\n", " FeatureSet,\n", " FeatureSetSpecification,\n", " MaterializationSettings,\n", " MaterializationComputeResource,\n", ")\n", "\n", "materialization_settings = MaterializationSettings(\n", " offline_enabled=True,\n", " resource=MaterializationComputeResource(instance_type=\"standard_e8s_v3\"),\n", " spark_configuration={\n", " \"spark.driver.cores\": 4,\n", " \"spark.driver.memory\": \"36g\",\n", " \"spark.executor.cores\": 4,\n", " \"spark.executor.memory\": \"36g\",\n", " \"spark.executor.instances\": 2,\n", " },\n", " schedule=None,\n", ")\n", "\n", "fset_config = FeatureSet(\n", " name=\"transactions-dsl\",\n", " version=\"1\",\n", " entities=[\"azureml:account:1\"],\n", " stage=\"Development\",\n", " specification=FeatureSetSpecification(path=dsl_spec_folder),\n", " materialization_settings=materialization_settings,\n", " tags={\"data_type\": \"nonPII\"},\n", ")\n", "\n", "poller = fs_client.feature_sets.begin_create_or_update(fset_config)\n", "print(poller.result())" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Now, materialize the feature set to persist the transformed feature data to the offline store." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709235979507 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "mater-dsl-trans-fset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "poller = fs_client.feature_sets.begin_backfill(\n", " name=\"transactions-dsl\",\n", " version=\"1\",\n", " feature_window_start_time=st,\n", " feature_window_end_time=et,\n", " spark_configuration={},\n", " data_status=[\"None\", \"Incomplete\"],\n", ")\n", "print(poller.result().job_ids)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can track the progress of materialization job by executing the following code cell." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709237497262 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "track-mater-job", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "# get the job URL, and stream the job logs (the back fill job could take 10+ minutes to complete)\n", "fs_client.jobs.stream(poller.result().job_ids[0])" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Next, print sample data from the feature set. Notice from the output information that the data was retrieved from the materilization store. `get_offline_features()` method that is used to retrieve training/inference data will also use the materialization store by default." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709237539288 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "lookup-trans-dsl-fset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "# look up the featureset by providing name and version\n", "transactions_featureset = featurestore.feature_sets.get(\"transactions-dsl\", \"1\")\n", "display(transactions_featureset.to_spark_dataframe().head(5))" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Generate a training dataframe using the registered feature set" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "### Load observation data\n", "\n", "Start by exploring the observation data. Observation data is typically the core data used in training and inference steps. This is then joined with the feature data to create complete training data. Observation data is the data captured during the time of the event: in this case it has core transaction data including transaction ID, account ID, transaction amount. Since this data is used for training, it also has the target variable appended (`is_fraud`)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709237542890 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "load-obs-data", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "observation_data_path = \"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/train/*.parquet\"\n", "observation_data_df = spark.read.parquet(observation_data_path)\n", "obs_data_timestamp_column = \"timestamp\"\n", "\n", "display(observation_data_df)\n", "# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Select features that would be part of the training data and use the feature store SDK to generate the training data." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709237569514 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "select-features-dsl", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "featureset = featurestore.feature_sets.get(\"transactions-dsl\", \"1\")\n", "\n", "# you can select features in pythonic way\n", "features = [\n", " featureset.get_feature(\"f_transaction_amount_7d_sum\"),\n", " featureset.get_feature(\"f_transaction_amount_7d_avg\"),\n", "]\n", "\n", "# you can also specify features in string form: featureset:version:feature\n", "more_features = [\n", " \"transactions-dsl:1:f_transaction_amount_3d_sum\",\n", " \"transactions-dsl:1:f_transaction_3d_count\",\n", "]\n", "\n", "more_features = featurestore.resolve_feature_uri(more_features)\n", "features.extend(more_features)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The function `get_offline_features()` appends the features to the observation data using a point-in-time join. Display the training dataframe obtained from the point-in-time join." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709237671740 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "get-offline-features-dsl", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from azureml.featurestore import get_offline_features\n", "\n", "training_df = get_offline_features(\n", " features=features,\n", " observation_data=observation_data_df,\n", " timestamp_column=obs_data_timestamp_column,\n", ")\n", "\n", "display(training_df.sort(\"transactionID\", \"accountID\", \"timestamp\"))" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "### Generate a training dataframe using registered feature sets with different types of transformations" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Register the `transactions-udf` feature set (that uses UDF) with the feature store with offline materialization enabled using the exported feature set specification." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709237792568 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "register-udf-trans-fset", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "fset_config = FeatureSet(\n", " name=\"transactions-udf\",\n", " version=\"1\",\n", " entities=[\"azureml:account:1\"],\n", " stage=\"Development\",\n", " specification=FeatureSetSpecification(path=udf_spec_folder),\n", " materialization_settings=materialization_settings,\n", " tags={\"data_type\": \"nonPII\"},\n", ")\n", "\n", "poller = fs_client.feature_sets.begin_create_or_update(fset_config)\n", "print(poller.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this step we will select features from the feature sets (created using different transformations) that we would like to be part of training data and use the feature store SDK to generate the training data." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709238117290 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "select-features-dsl-udf", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "featureset_dsl = featurestore.feature_sets.get(\"transactions-dsl\", \"1\")\n", "featureset_udf = featurestore.feature_sets.get(\"transactions-udf\", \"1\")\n", "\n", "# you can select features in pythonic way\n", "features = [\n", " featureset_dsl.get_feature(\"f_transaction_amount_7d_sum\"),\n", " featureset_udf.get_feature(\"transaction_amount_7d_avg\"),\n", "]\n", "\n", "# you can also specify features in string form: featureset:version:feature\n", "more_features = [\n", " \"transactions-udf:1:transaction_amount_3d_sum\",\n", " \"transactions-dsl:1:f_transaction_3d_count\",\n", "]\n", "\n", "more_features = featurestore.resolve_feature_uri(more_features)\n", "features.extend(more_features)" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "The function `get_offline_features()` appends the features to the observation data using a point-in-time join. Display the training dataframe obtained from the point-in-time join." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1709238213028 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "name": "get-offline-features-dsl-udf", "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from azureml.featurestore import get_offline_features\n", "\n", "training_df = get_offline_features(\n", " features=features,\n", " observation_data=observation_data_df,\n", " timestamp_column=obs_data_timestamp_column,\n", ")\n", "\n", "display(training_df.sort(\"transactionID\", \"accountID\", \"timestamp\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can see how the features are appended to the training data using a point-in-time join." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup\n", "\n", "Tutorial notebook `5. Develop a feature set with custom source` has instructions for deleting the resources." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Next steps\n", "* [Tutorial 2: Experiment and train models using features](https://learn.microsoft.com/azure/machine-learning/tutorial-experiment-train-models-using-features)\n", "* [Tutorial 3: Enable recurrent materialization and run batch inference](https://learn.microsoft.com/azure/machine-learning/tutorial-enable-recurrent-materialization-run-batch-inference)" ] } ], "metadata": { "kernelspec": { "display_name": "fs_env", "language": "python", "name": "fs_env" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.5" } }, "nbformat": 4, "nbformat_minor": 4 }