# Tutorial #1: Develop a feature set and register with managed feature store

Azure ML managed feature store lets you discover, create and operationalize features. Features are the connective tissue in ML lifecycle, starting from prototyping where you experiment with various features to operationalization where models are deployed and feature data is looked up during inference. For information on basics concept of feature store, see [feature store concepts](fs-concepts).

In this tutorial series you will experience how features seamlessly integrates all the phases of ML lifecycle.

This tutorial is the first part of a three part series. In this tutorial you will:
- Create a new minimal feature store resource
- Develop and test feature set locally with feature transformation capability
- Register a feature store entity with the feature store
- Register the feature set that you developed with the feature store
- Generate sample training data dataframe using the features you created
- Enable offline materialization on the feature sets, and backfill the feature data


## Prerequisites

> [!NOTE]
> This tutorial uses Azure Machine Learning notebook with **Serverless Spark Compute**.

Before following the steps in this article, make sure you have the following prerequisites:

* An Azure Machine Learning workspace. If you don't have one, use the steps in the [Quickstart: Create workspace resources](https://learn.microsoft.com/en-us/azure/machine-learning/quickstart-create-resources?view=azureml-api-2) article to create one.
* To perform the steps in this article, your user account must be assigned the owner role to a resource group where the feature store will be created

## Set up

This tutorial uses the Python feature store core SDK (`azureml-featurestore`). The Python SDK is used for create, read, update, and delete (CRUD) operations, on feature stores, feature sets, and feature store entities.

You don't need to explicitly install these resources for this tutorial, because in the set-up instructions shown here, the `conda.yml` file covers them.

To prepare the notebook environment for development:

1. In the Azure Machine Learning studio environment, select Notebooks on the left pane, and then select the Samples tab.

1. Browse to the featurestore_sample directory (select **Samples** > **SDK v2** > **sdk** > **python** > **featurestore_sample**), and then select **Clone**.

1. The **Select target directory** panel opens. Select the **Users** directory and then select _your user name_, and then select **Clone**.

1. Run the tutorial

   * Option 1: Create a new notebook, and execute the instructions in this document, step by step.
   * Option 2: Open existing notebook `featurestore_sample/notebooks/sdk_only/1. Develop a feature set and register with managed feature store.ipynb`. You may keep this document open and refer to it for more explanation and documentation links.

1. To configure the notebook environment, you must upload the conda.yml file

   1. Select **Notebooks** on the left pane, and then select the **Files** tab.
   1. Browse to the *env* directory (select **Users** > *your_user_name* > **featurestore_sample** > **project** > **env**), and then select the conda.yml file.
   1. Select **Download**
   1. Select **Serverless Spark Compute** in the top navigation **Compute** dropdown. This operation might take one to two minutes. Wait for a status bar in the top to display **Configure session**.
   1. Select **Configure session** in the top status bar.
   1. Select **Python packages**.
   1. Select **Upload conda file**.
   1. Select the `conda.yml` you downloaded on your local device.
   1. (Optional) Increase the session time-out (idle time in minutes) to reduce the serverless spark cluster startup time.

__Important:__ Except for this step, you need to run all the other steps every time you have a new spark session/session time out.


#### Start spark session
Execute the following code cell to start the Spark session. It wil take approximately 10 minutes to install all dependencies and start the Spark session.

In [None]:
# Run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.
print("start spark session")

#### Setup root directory for the samples
This code cell sets up the root directory for the samples.

In [None]:
import os

# Please update <your_user_alias> below (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

## Note
Feature store Vs Project workspace: You will use a feature store to reuse features across projects. You will use a project workspace (the current workspace) to train and inference models, by leveraging features from feature stores. Many project workspaces can share and reuse the same feature store.

## Note
In this tutorial you will be using two SDKs:

1. Feature store CRUD SDK:  You will use the same SDK, MLClient (package name `azure-ai-ml`), that you use with Azure ML workspace. This will be used for feature store CRUD operations (create, read, update, and delete) for feature store, feature set and feature store entities. This is because feature store is implemented as a type of workspace. 
2. Feature store core SDK: This SDK (`azureml-featurestore`) is meant to be used for feature set development and consumption (you will learn more about these operations later):
- List/Get registered feature set
- Generate/resolve feature retrieval spec
- Execute feature set definition to generate Spark dataframe
- Generate training data using a point-in-time join

For this tutorial, you do not need to install any of these explicitly, since the instructions already cover them (`conda.yml` in the above step include these)

## Step 1: Create a minimal feature store

#### Step 1a: Set feature store parameters
Set name, location and other values for the feature store.

In [None]:
# We use the subscription, resource group, region of this active project workspace.
# You can optionally replace them to create the resources in a different subsciprtion/resource group, or use existing resources.
import os

featurestore_name = "<FEATURESTORE_NAME>"
featurestore_location = "eastus"
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

#### Step 1b: Create the feature store

In [None]:
from azure.ai.ml import MLClient
from azure.ai.ml.entities import (
    FeatureStore,
    FeatureStoreEntity,
    FeatureSet,
)
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

ml_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
)


fs = FeatureStore(name=featurestore_name, location=featurestore_location)
# wait for feature store creation
fs_poller = ml_client.feature_stores.begin_create(fs)
print(fs_poller.result())

#### Step 1c: Initialize AzureML feature store core SDK client
As explained above, this is used to develop and consume features.

In [None]:
# feature store client
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

## Step 2: Prototype and develop a transaction rolling aggregation feature set in this notebook

#### Step 2a: Explore the transactions source data

#### Note
The sample data used in this notebook is hosted in a public accessible blob container. It can only be read in Spark via `wasbs` driver. When you create feature sets using your own source data, please host them in ADLS Gen2 account and use `abfss` driver in the data path.  

In [None]:
# remove the "." in the roor directory path as we need to generate absolute path to read from spark
transactions_source_data_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet"
transactions_src_df = spark.read.parquet(transactions_source_data_path)

display(transactions_src_df.head(5))
# Note: display(training_df.head(5)) displays the timestamp column in a different format. You can can call transactions_src_df.show() to see correctly formatted value

#### Step 2b: Develop a transactions feature set locally

Feature set specification is a self-contained definition of feature set that can be developed and tested locally.

Lets create the following rolling window aggregate features:
- transactions 3-day count
- transactions amount 3-day sum
- transactions amount 3-day avg
- transactions 7-day count
- transactions amount 7-day sum
- transactions amount 7-day avg

__Action__:
- Inspect the feature transformation code file: `featurestore/featuresets/transactions/spec/transformation_code/transaction_transform.py`. You will see how is the rolling aggregation defined for the features. This is a Spark transformer.

To understand the feature set and transformations in more detail, see [feature store concepts](https://learn.microsoft.com/azure/machine-learning/concept-what-is-managed-feature-store).

In [None]:
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.contracts import (
    DateTimeOffset,
    TransformationCode,
    Column,
    ColumnType,
    SourceType,
    TimestampColumn,
)
from azureml.featurestore.feature_source import ParquetFeatureSource

transactions_featureset_code_path = (
    root_dir + "/featurestore/featuresets/transactions/transformation_code"
)

transactions_featureset_spec = create_feature_set_spec(
    source=ParquetFeatureSource(
        path="wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source/*.parquet",
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
    ),
    feature_transformation=TransformationCode(
        path=transactions_featureset_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

In [None]:
# Generate a Spark dataframe from the feature set specification
transactions_fset_df = transactions_featureset_spec.to_spark_dataframe()
# Display few records
display(transactions_fset_df.head(5))

#### Step 2c:  Export as feature set specification
In order to register the feature set specification with the feature store, it needs to be saved in a specific format. 
Action: Please inspect the generated `transactions` FeaturesetSpec: Open this file from the file tree to see the specification: `featurestore/featuresets/accounts/spec/FeaturesetSpec.yaml`

specification contains these important elements:

1. `source`: Reference to a storage. In this case a parquet file in a blob storage.
2. `features`: List of features and their datatypes. If you provide transformation code, the code has to return a dataframe that maps to the features and data types.
3. `index_columns`: The join keys required to access values from the feature set

Learn more about it in the [top level feature store entities document](https://learn.microsoft.com/azure/machine-learning/concept-top-level-entities-in-managed-feature-store) and the [feature set specification YAML reference](https://learn.microsoft.com/azure/machine-learning/reference-yaml-featureset-spec).

The additional benefit of persisting the feature set specification is that it can be source controlled.

In [None]:
import os

# Create a new folder to dump the feature set specification.
transactions_featureset_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions/spec"
)

# Check if the folder exists, create one if it does not exist.
if not os.path.exists(transactions_featureset_spec_folder):
    os.makedirs(transactions_featureset_spec_folder)

transactions_featureset_spec.dump(transactions_featureset_spec_folder, overwrite=True)

## Step 3: Register a feature store entity
Entity helps enforce best practice that same join key definitions are used across featuresets which uses the same logical entities. Examples of entities are account entity, customer entity etc. Entities are typically created once and reused across feature sets. For information on basics concept of feature store, see [feature store concepts](https://learn.microsoft.com/azure/machine-learning/concept-what-is-managed-feature-store).

#### Step 3a: Initialize the feature store CRUD client

As explained in the beginning of this tutorial, `MLClient` is used for CRUD of assets in feature store. The below code looks up the feature store we created in an earlier step. We cannot reuse the same `ml_client` used above here because the former is scoped at the resource group level, which is a prerequisite for creation of feature store. The below one is scoped at feature store level.
 

In [None]:
# MLClient for feature store.
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

#### Step 3b: Register `account` entity with the feature store
Create account entity that has join key `accountID` of `string` type. 

In [None]:
from azure.ai.ml.entities import DataColumn, DataColumnType

account_entity_config = FeatureStoreEntity(
    name="account",
    version="1",
    index_columns=[DataColumn(name="accountID", type=DataColumnType.STRING)],
    stage="Development",
    description="This entity represents user account index key accountID.",
    tags={"data_typ": "nonPII"},
)

poller = fs_client.feature_store_entities.begin_create_or_update(account_entity_config)
print(poller.result())

## Step 4: Register the transaction feature set with the feature store
You register a feature set asset with the feature store so that you can share and reuse with others. You also get managed capabilities like versioning and materialization (we will learn in this tutorial series).

The feature set asset has reference to the feature set spec that you created earlier and additional properties like version and materialization settings.

In [None]:
from azure.ai.ml.entities import FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions",
    version="1",
    description="7-day and 3-day rolling aggregation of transactions featureset",
    entities=[f"azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=transactions_featureset_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

#### Explore the feature store UI
* Goto the [Azure ML global landing page](https://ml.azure.com/home).
* Click on **Feature stores** in the left navigation.
* You will see the list of feature stores that you have access to. Click on the feature store that you have created above.

You can see the feature sets and entities that you have created.

Note: Creating and updating feature store assets (feature sets and entities) is possible only through SDK and CLI. You can use the UI to search/browse the feature store.

#### Grant "Storage Blob Data Reader" role on the offline store to your user identity
If feature data is materialized, then you need this role to read feature data from offline materialization store.
- Get your AAD object id from Azure portal following this instruction: https://learn.microsoft.com/en-us/partner-center/find-ids-and-domain-names#find-the-user-object-id\
- Get information about the offline materialization store from the Feature Store **Overview** page in the Feature Store UI. The storage account subscription ID, storage account resource group name, and storage account name for offline materialization store can be found on **Offline materialization store** card. 
![OFFLINE_STORE_INFO](./images/offline-store-information.png) 

To learn more about access control, see access control document in the docs.

Execute the following code cell for role assignment. Please note that it may take some time for permissions to propagate. 

In [None]:
# This utility function is created for ease of use in the docs tutorials. It uses standard azure API's.
# You can optionally inspect it `featurestore/setup/setup_storage_uai.py`.
import sys

sys.path.insert(0, root_dir + "/featurestore/setup")
from setup_storage_uai import grant_user_aad_storage_data_reader_role

your_aad_objectid = "<USER_AAD_OBJECTID>"
storage_subscription_id = "<SUBSCRIPTION_ID>"
storage_resource_group_name = "<RESOURCE_GROUP>"
storage_account_name = "<STORAGE_ACCOUNT_NAME>"

grant_user_aad_storage_data_reader_role(
    AzureMLOnBehalfOfCredential(),
    your_aad_objectid,
    storage_subscription_id,
    storage_resource_group_name,
    storage_account_name,
)

## Step 5: Generate a training data dataframe using the registered features

#### Step 5a: Load observation data

We start by exploring the observation data. Observation data is typically the core data used in training and inference data. This is then joined with feature data to create the full training data. Observation data is the data captured during the time of the event: in this case it has core transaction data including transaction ID, account ID, transaction amount. In this case, since it is for training, it also has the target variable appended (`is_fraud`).

To learn more core concepts including observation data, refer to the feature store documentation.

In [None]:
observation_data_path = "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/observation_data/train/*.parquet"
observation_data_df = spark.read.parquet(observation_data_path)
obs_data_timestamp_column = "timestamp"

display(observation_data_df)
# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value

#### Step 5b: Get the registered feature set and list its features

In [None]:
# Look up the featureset by providing a name and a version.
transactions_featureset = featurestore.feature_sets.get("transactions", "1")
# List its features.
transactions_featureset.features

In [None]:
# Print sample values.
display(transactions_featureset.to_spark_dataframe().head(5))

#### Step 5c: Select features and generate training data
In this step we will select features that we would like to be part of training data and use the feature store SDK to generate the training data.

In [None]:
from azureml.featurestore import get_offline_features

# You can select features in pythonic way.
features = [
    transactions_featureset.get_feature("transaction_amount_7d_sum"),
    transactions_featureset.get_feature("transaction_amount_7d_avg"),
]

# You can also specify features in string form: featureset:version:feature.
more_features = [
    f"transactions:1:transaction_3d_count",
    f"transactions:1:transaction_amount_3d_avg",
]

more_features = featurestore.resolve_feature_uri(more_features)
features.extend(more_features)

# Generate training dataframe by using feature data and observation data.
training_df = get_offline_features(
    features=features,
    observation_data=observation_data_df,
    timestamp_column=obs_data_timestamp_column,
)

# Ignore the message that says feature set is not materialized (materialization is optional). We will enable materialization in the subsequent part of the tutorial.
display(training_df)
# Note: the timestamp column is displayed in a different format. Optionally, you can can call training_df.show() to see correctly formatted value

You can see how the features are appended to the training data using a point-in-time join.

## Step 6: Enable offline materialization on transactions feature set
Once materialization is enabled on a feature set, you can perform backfill (this tutorial) or schedule recurrent materialization jobs (shown in a later tutorial).

#### Set spark.sql.shuffle.partitions in the yaml file according to the feature data size

The spark configuration `spark.sql.shuffle.partitions` is an OPTIONAL parameter that can affect the number of parquet files generated (per day) when the feature set is materialized into the offline store. The default value of this parameter is 200. The best practice is to avoid generating many small parquet files. If offline feature retrieval turns out to become slow after the feature set is materialized, please go to the corresponding folder in the offline store to check whether it is the issue of having too many small parquet files (per day), and adjust the value of this parameter accordingly.

*Note: The sample data used in this notebook is small. So this parameter is set to 1 in the code below.*

In [None]:
from azure.ai.ml.entities import (
    MaterializationSettings,
    MaterializationComputeResource,
)

transactions_fset_config = fs_client._featuresets.get(name="transactions", version="1")

transactions_fset_config.materialization_settings = MaterializationSettings(
    offline_enabled=True,
    resource=MaterializationComputeResource(instance_type="standard_e8s_v3"),
    spark_configuration={
        "spark.driver.cores": 4,
        "spark.driver.memory": "36g",
        "spark.executor.cores": 4,
        "spark.executor.memory": "36g",
        "spark.executor.instances": 2,
        "spark.sql.shuffle.partitions": 1,
    },
    schedule=None,
)

fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)
print(fs_poller.result())

Optionally, you can save the the above feature set asset as YAML.

In [None]:
## uncomment to run
transactions_fset_config.dump(
    root_dir
    + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled.yaml"
)

## Step 7: Backfill data for transactions feature set
Materialization is the process of computing the feature values for a given feature window and storing this in an materialization store. Materializing the features will increase its reliability and availability. All feature queries will use the materialized values from the materialization store. In this step you perform a one-time backfill for a feature window of __18 months__.

#### Note
How to determine the window of backfill data needed? It has to match with the window of your training data. For e.g. if you want to train with two years of data, then you will want to be able to retrieve features for the same window, so you will backfill for a two year window.

In [None]:
from datetime import datetime
from azure.ai.ml.entities import DataAvailabilityStatus

st = datetime(2022, 1, 1, 0, 0, 0, 0)
et = datetime(2023, 6, 30, 0, 0, 0, 0)

poller = fs_client.feature_sets.begin_backfill(
    name="transactions",
    version="1",
    feature_window_start_time=st,
    feature_window_end_time=et,
    data_status=[DataAvailabilityStatus.NONE],
)
print(poller.result().job_ids)

In [None]:
# Get the job URL, and stream the job logs.
fs_client.jobs.stream(poller.result().job_ids[0])

Let's print sample data from the feature set. You can notice from the output information that the data was retrieved from the materilization store. `get_offline_features()` method that is used to retrieve training/inference data will also use the materialization store by default.

In [None]:
# Look up the feature set by providing a name and a version and display few records.
transactions_featureset = featurestore.feature_sets.get("transactions", "1")
display(transactions_featureset.to_spark_dataframe().head(5))

## Cleanup

Tutorial `5. Develop a feature set with custom source` has instructions for deleting the resources.

## Next steps
* Experiment and train models using features.