sdk/python/featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb (512 lines of code) (raw):
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tutorial #5: Develop a feature set with a custom source\n",
"Managed feature store supports defining a custom source for data. A custom source definition allows you to define their own logic to load data from any data storage. This allows support for complex scenarios, such as\n",
"- Loading data from multiple tables with a complex join logic.\n",
"- Loading data efficiently from data sources that have a custom partition format.\n",
"- Support for data sources that do not use natively supported formats, e.g: parquet, `MLTable` and delta table. \n",
" \n",
"In this tutorial you will configure a feature set to consume data from a user-defined custom data source."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Prerequisites\n",
"\n",
"1. Before proceeding, please ensure that you have already completed previous three tutorials of this series. We will be reusing feature store and some other resources created in the previous tutorials."
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## Setup\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Configure Azure ML spark notebook\n",
"\n",
"1. In the \"Compute\" dropdown in the top nav, select \"Serverless Spark Compute\". \n",
"1. Click on \"configure session\" in top status bar -> click on \"Python packages\" -> click on \"upload conda file\" -> select the file azureml-examples/sdk/python/featurestore-sample/project/env/conda.yml from your local machine; Also increase the session time out (idle time) if you want to avoid running the prerequisites frequently\n",
"\n",
"\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup root directory for the samples\n",
"This code cell sets up the root directory for the samples. It may needs about 10 minutes to execute this cell as it also installs all Conda dependencies and starts the Spark session."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1691710922464
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "root-dir",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"import os\n",
"\n",
"# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).\n",
"# You can find the name from the directory structure in the left navigation panel.\n",
"root_dir = \"./Users/<your_user_alias>/featurestore_sample\"\n",
"\n",
"if os.path.isdir(root_dir):\n",
" print(\"The folder exists.\")\n",
"else:\n",
" print(\"The folder does not exist. Please create or fix the path\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize the CRUD client of the feature store workspace\n",
" Initialize the `MLClient` for the feature store workspace, for the create, read, update, and delete (CRUD) operations on the feature store workspace."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1691711012673
},
"name": "init-fset-crud-client"
},
"outputs": [],
"source": [
"from azure.ai.ml import MLClient\n",
"from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n",
"\n",
"# Feature store\n",
"featurestore_name = (\n",
" \"<FEATURESTORE_NAME>\" # use the same name that was used in the tutorial #1\n",
")\n",
"featurestore_subscription_id = os.environ[\"AZUREML_ARM_SUBSCRIPTION\"]\n",
"featurestore_resource_group_name = os.environ[\"AZUREML_ARM_RESOURCEGROUP\"]\n",
"\n",
"# Feature store ml client\n",
"fs_client = MLClient(\n",
" AzureMLOnBehalfOfCredential(),\n",
" featurestore_subscription_id,\n",
" featurestore_resource_group_name,\n",
" featurestore_name,\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize the feature store core SDK client\n",
"As mentioned earlier, this tutorial uses the Python feature store core SDK (`azureml-featurestore`). This initialized SDK client is used for create, read, update, and delete (CRUD) operations, on feature stores, feature sets, and feature store entities."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1691711017028
},
"name": "init-fs-core-sdk"
},
"outputs": [],
"source": [
"from azureml.featurestore import FeatureStoreClient\n",
"from azure.ai.ml.identity import AzureMLOnBehalfOfCredential\n",
"\n",
"featurestore = FeatureStoreClient(\n",
" credential=AzureMLOnBehalfOfCredential(),\n",
" subscription_id=featurestore_subscription_id,\n",
" resource_group_name=featurestore_resource_group_name,\n",
" name=featurestore_name,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Custom source definition\n",
"Custom source definition enables you to define your own source loading logic from any data storage. For using this feature, implement a a source processor user-defined function (UDF) class (`CustomSourceTransformer` in this tutorial). This class should define an `__init__(self, **kwargs)` function and a `process(self, start_time, end_time, **kwargs)` function. The `kwargs` dictionary is supplied as a part of the feature set specification definition, which is passed to the UDF. The `start_time` and `end_time` parameters are calculated and passed to the UDF function.\n",
"\n",
"Below is a sample code of source processor UDF class:\n",
"\n",
"```python\n",
"from datetime import datetime\n",
"\n",
"\n",
"class CustomSourceTransformer:\n",
" def __init__(self, **kwargs):\n",
" self.path = kwargs.get(\"source_path\")\n",
" self.timestamp_column_name = kwargs.get(\"timestamp_column_name\")\n",
" if not self.path:\n",
" raise Exception(\"`source_path` is not provided\")\n",
" if not self.timestamp_column_name:\n",
" raise Exception(\"`timestamp_column_name` is not provided\")\n",
"\n",
" def process(\n",
" self, start_time: datetime, end_time: datetime, **kwargs\n",
" ) -> \"pyspark.sql.DataFrame\":\n",
" from pyspark.sql import SparkSession\n",
" from pyspark.sql.functions import col, lit, to_timestamp\n",
"\n",
" spark = SparkSession.builder.getOrCreate()\n",
" df = spark.read.json(self.path)\n",
"\n",
" if start_time:\n",
" df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))\n",
"\n",
" if end_time:\n",
" df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))\n",
"\n",
" return df\n",
"\n",
"```"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create a feature set specification with custom source and experiment with it locally\n",
"Now, create a feature set specification with custom source definition and use your development environment to experiment with the feature set. The tutorial notebook attached to **Serverless Spark Compute** serves as the development environment."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "create-fs-custom-src"
},
"outputs": [],
"source": [
"from azureml.featurestore import create_feature_set_spec\n",
"from azureml.featurestore.feature_source import CustomFeatureSource\n",
"from azureml.featurestore.contracts import (\n",
" SourceProcessCode,\n",
" TransformationCode,\n",
" Column,\n",
" ColumnType,\n",
" DateTimeOffset,\n",
" TimestampColumn,\n",
")\n",
"\n",
"transactions_source_process_code_path = (\n",
" root_dir\n",
" + \"/featurestore/featuresets/transactions_custom_source/source_process_code\"\n",
")\n",
"transactions_feature_transform_code_path = (\n",
" root_dir\n",
" + \"/featurestore/featuresets/transactions_custom_source/feature_process_code\"\n",
")\n",
"\n",
"udf_featureset_spec = create_feature_set_spec(\n",
" source=CustomFeatureSource(\n",
" kwargs={\n",
" \"source_path\": \"wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json\",\n",
" \"timestamp_column_name\": \"timestamp\",\n",
" },\n",
" timestamp_column=TimestampColumn(name=\"timestamp\"),\n",
" source_delay=DateTimeOffset(days=0, hours=0, minutes=20),\n",
" source_process_code=SourceProcessCode(\n",
" path=transactions_source_process_code_path,\n",
" process_class=\"source_process.CustomSourceTransformer\",\n",
" ),\n",
" ),\n",
" feature_transformation=TransformationCode(\n",
" path=transactions_feature_transform_code_path,\n",
" transformer_class=\"transaction_transform.TransactionFeatureTransformer\",\n",
" ),\n",
" index_columns=[Column(name=\"accountID\", type=ColumnType.string)],\n",
" source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),\n",
" temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),\n",
" infer_schema=True,\n",
")\n",
"\n",
"udf_featureset_spec"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, define a feature window and display feature values in this feature window."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "display-features",
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"from datetime import datetime\n",
"\n",
"st = datetime(2023, 1, 1)\n",
"et = datetime(2023, 6, 1)\n",
"\n",
"display(\n",
" udf_featureset_spec.to_spark_dataframe(\n",
" feature_window_start_date_time=st, feature_window_end_date_time=et\n",
" )\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Export as a feature set specification\n",
"To register the feature set specification with the feature store, you must save that specification in a specific format. Review the generated `transactions_custom_source` feature set specification. Open this file from the file tree to see the specification: `featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml`.\n",
"\n",
"The specification contains these elements:\n",
"\n",
"- `features`: A list of features and their datatypes.\n",
"- `index_columns`: The join keys required to access values from the feature set.\n",
"\n",
"To learn more about the specification, see [Understanding top-level entities in managed feature store](https://learn.microsoft.com/azure/machine-learning/concept-top-level-entities-in-managed-feature-store) and [CLI (v2) feature set YAML schema](https://learn.microsoft.com/azure/machine-learning/reference-yaml-feature-set).\n",
"\n",
"Persisting the feature set specification offers another benefit: the feature set specification can be source controlled."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"name": "dump-txn-fs-spec"
},
"outputs": [],
"source": [
"feature_spec_folder = (\n",
" root_dir + \"/featurestore/featuresets/transactions_custom_source/spec\"\n",
")\n",
"\n",
"udf_featureset_spec.dump(feature_spec_folder)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Register the transaction feature set with the feature store\n",
"Use the following code to register a feature set asset loaded from custom source with the feature store. You can then reuse that asset and easily share it. Registration of a feature set asset offers managed capabilities, including versioning and materialization."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1691712951072
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "register-txn-fset",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification\n",
"\n",
"transaction_fset_config = FeatureSet(\n",
" name=\"transactions_custom_source\",\n",
" version=\"1\",\n",
" description=\"transactions feature set loaded from custom source\",\n",
" entities=[\"azureml:account:1\"],\n",
" stage=\"Development\",\n",
" specification=FeatureSetSpecification(path=feature_spec_folder),\n",
" tags={\"data_type\": \"nonPII\"},\n",
")\n",
"\n",
"poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)\n",
"print(poller.result())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Get the registered feature set, and print related information."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1691712961910
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "get-txn-fset",
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Look up the feature set by providing name and version\n",
"transactions_fset_config = featurestore.feature_sets.get(\n",
" name=\"transactions_custom_source\", version=\"1\"\n",
")\n",
"# Print feature set information\n",
"print(transactions_fset_config)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test feature generation from registered feature set\n",
"Test feature generation from the registered feature set by using `to_spark_dataframe()` function of the feature set, and display the features."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"gather": {
"logged": 1691712966166
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"name": "print-txn-fset-sample-values",
"nteract": {
"transient": {
"deleting": false
}
},
"tags": [
"active-ipynb"
]
},
"outputs": [],
"source": [
"df = transactions_fset_config.to_spark_dataframe()\n",
"display(df)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"You should be able to successfully fetch the registered feature set as a Spark dataframe and display it. Now, you can use these features for a point-in-time with observation data and the subsequent steps in you machine learning pipeline. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cleanup\n",
"If you created a resource group for the tutorials, you can delete the resource group to delete all the resources associated with this tutorial.\n",
"\n",
"Otherwise, you can delete the resources individually:\n",
"\n",
"* Delete the feature store: Go to the resource group in the Azure portal, select the feature store and delete it.\n",
"* Follow the instructions [here](https://review.learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-manage-user-assigned-managed-identities?pivots=identity-mi-methods-azp&view=azureml-api-2#delete-a-user-assigned-managed-identity) to delete the user assigned managed identity.\n",
"* Delete the offline store (storage account): Go to the resource group in the Azure portal, select the storage you created and delete it.\n",
"* Delete the online store (Redis instance): Go to the resource group in the Azure portal, select the Redis instance you created and delete it."
]
}
],
"metadata": {
"celltoolbar": "Edit Metadata",
"kernel_info": {
"name": "synapse_pyspark"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
},
"microsoft": {
"host": {
"AzureML": {
"notebookHasBeenCompleted": true
}
},
"ms_spell_check": {
"ms_spell_check_language": "en"
}
},
"nteract": {
"version": "nteract-front-end@1.0.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}