# Tutorial #4: Enable online materialization and run online inference
So far you have learned how to develop features, materialize them to offline materialization store, perform training, and perform batch inference. In this tutorial you will learn how to use feature store for online/realtime inference use cases.

You will perform the following steps:

1. Setup Azure Cache for Redis.
1. Attach the cache to feature store as the online materialization store and grant necessary permissions.
1. Materialize feature sets to the online store.
1. Test online deployment with mock data.


# Prerequisites
1. Before proceeding, please ensure that you have already completed previous three tutorials of this series. We will be reusing feature store and some other resources created in the previous tutorials.

# Set up

#### Configure Azure ML spark notebook

1. In the "Compute" dropdown in the top nav, select "Serverless Spark Compute". 
1. Click on "configure session" in top status bar -> click on "Python packages" -> click on "upload conda file" -> select the file azureml-examples/sdk/python/featurestore-sample/project/env/online.yml from your local machine; Also increase the session time out (idle time) if you want to avoid running the prerequisites frequently




### Start Spark session
Execute the following code cell to start the Spark session. It wil take approximately 10 minutes to install all dependencies and start the Spark session.

In [None]:
# Run this cell to start the spark session (any code block will start the session ). This can take approximately 10 mins.
print("start spark session")

### Setup root directory for the samples

In [None]:
import os

# Please update the dir to ./Users/<your_user_alias> (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

### Initialize the project workspace CRUD client
The `MLClient` for the current workspace, where you are running this tutorial notebook, will be used for create, read, update, and delete (CRUD) operations.

In [None]:
import os
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"]
project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"]

# Connect to the project workspace
ws_client = MLClient(
    AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name
)

### Initialize the CRUD client of the feature store workspace
The `MLClient` for the feature store workspace for create, read, update, and delete (CRUD) operations on feature store workspace.

In [None]:
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name from part #1 of the tutorial
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store MLClient
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

### Initialize the feature store core SDK client
This tutorial uses the Python feature store core SDK (`azureml-featurestore`). The SDK client initialized here is used for create, read, update, and delete (CRUD) operations, on feature stores, feature sets, and feature store entities.

In [None]:
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

## Setup Azure cache for Redis
This tutorial uses Azure Cache for Redis as the online materialization store. You can either create a new Redis instance or reuse an existing one.

### Set values for the Azure Cache for Redis that will be used as online materialization store
In the following code cell, define the name of the Azure Cache for Redis that you want to create or reuse. Optionally, you can override other default settings.

In [None]:
ws_location = ws_client.workspaces.get(ws_client.workspace_name).location

redis_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
redis_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
redis_name = "<REDIS_NAME>"
redis_location = ws_location

### Azure Cache for Redis (option 1): create new Redis instance
You can select the Redis cache tier (basic, standard, premium, or enterprise). You should choose a SKU family that is available for the selected cache tier. See this documentation page to learn more about [how selecting different tiers may affect cache performance](https://learn.microsoft.com/azure/azure-cache-for-redis/cache-best-practices-performance).  See this link learn more about [pricing for different SKU tiers and families of Azure Cache for Redis](https://azure.microsoft.com/en-us/pricing/details/cache/).

Execute the following code cell to create an Azure Cache for Redis with premium tier, SKU family `P` and cache capacity 2. It may take approximately 5-10 minutes to provision the Redis instance.

In [None]:
from azure.mgmt.redis import RedisManagementClient
from azure.mgmt.redis.models import RedisCreateParameters, Sku, SkuFamily, SkuName

management_client = RedisManagementClient(
    AzureMLOnBehalfOfCredential(), redis_subscription_id
)

# It usually takes about 5 - 10 min to finish the provision of the Redis instance.
# If the following begin_create() call still hangs for longer than that,
# please check the status of the Redis instance on the Azure portal and cancel the cell if the provision has completed.
# This sample uses a PREMIUM tier Redis SKU from family P, which may cost more than a STANDARD tier SKU from family C.
# Please choose the SKU tier and family according to your performance and pricing requirements.

redis_arm_id = (
    management_client.redis.begin_create(
        resource_group_name=redis_resource_group_name,
        name=redis_name,
        parameters=RedisCreateParameters(
            location=redis_location,
            sku=Sku(name=SkuName.PREMIUM, family=SkuFamily.P, capacity=2),
        ),
    )
    .result()
    .id
)

print(redis_arm_id)

### Azure Cache for Redis (option 2): use existing Redis instance
Optionally, you can reuse an existing Redis instance with the previously defined name by executing the following code.

In [None]:
redis_arm_id = "/subscriptions/{sub_id}/resourceGroups/{rg}/providers/Microsoft.Cache/Redis/{name}".format(
    sub_id=redis_subscription_id,
    rg=redis_resource_group_name,
    name=redis_name,
)

# Step 1: Attach online materialization store to the feature store
Attach the Azure Cache for Redis to the feature store to be used as the online materialization store.

In [None]:
from azure.ai.ml.entities import (
    ManagedIdentityConfiguration,
    FeatureStore,
    MaterializationStore,
)

online_store = MaterializationStore(type="redis", target=redis_arm_id)

ml_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
)

fs = FeatureStore(
    name=featurestore_name,
    online_store=online_store,
)

fs_poller = ml_client.feature_stores.begin_create(fs)
print(fs_poller.result())

# Step 2: Materialize `accounts` feature set data to online store

### Enable materialization on the `accounts` feature set
In the previous parts of the tutorial series, we did **not** materialize the accounts feature set since it had precomputed features and was used only for batch inference scenarios. In this step we will enable online materialization so that the features are available in the online store with low latency access. We will also enable offline materialization for consistency. Enabling offline materialization is optional.

In [None]:
from azure.ai.ml.entities import (
    MaterializationSettings,
    MaterializationComputeResource,
)

# Turn on both offline and online materialization on the "accounts" featureset.

accounts_fset_config = fs_client._featuresets.get(name="accounts", version="1")

accounts_fset_config.materialization_settings = MaterializationSettings(
    offline_enabled=True,
    online_enabled=True,
    resource=MaterializationComputeResource(instance_type="standard_e8s_v3"),
    spark_configuration={
        "spark.driver.cores": 4,
        "spark.driver.memory": "36g",
        "spark.executor.cores": 4,
        "spark.executor.memory": "36g",
        "spark.executor.instances": 2,
    },
    schedule=None,
)

fs_poller = fs_client.feature_sets.begin_create_or_update(accounts_fset_config)
print(fs_poller.result())

### Backfill the `account` feature set
`backfill` command backfills data to all the materialization stores that are enabled for this feature set. In this case both offline and online materialization is enabled. Therefore `backfill` will be performed on both offline and online materialization stores.

In [None]:
from datetime import datetime, timedelta

# Trigger backfill on the "accounts" feature set.
# Backfill from 01/01/2020 to all the way to 3 hours ago.

st = datetime(2020, 1, 1, 0, 0, 0, 0)
et = datetime.now() - timedelta(hours=3)

poller = fs_client.feature_sets.begin_backfill(
    name="accounts",
    version="1",
    feature_window_start_time=st,
    feature_window_end_time=et,
    data_status=["None"],
)
print(poller.result().job_ids)

The following code cell tracks completion of the backfill job. Using the premium tier Azure Cache for Redis provisioned earlier, this step may take approximately 10 minutes to complete.

In [None]:
# Get the job URL, and stream the job logs.
# With PREMIUM Redis SKU, SKU family "P", and cache capacity 2,
# it takes approximately 10 minutes to complete.
fs_client.jobs.stream(poller.result().job_ids[0])

# Step 3: Materialize `transactions` feature set data to the online store

In the previous tutorials, we materialized data of the `transactions` feature set to the offline materialization store. In this step we will:

1. Enable online materilization for the `transactions` feature set.

In [None]:
# Enable materialization to online store for the "transactions" feature set.

transactions_fset_config = fs_client._featuresets.get(name="transactions", version="1")
transactions_fset_config.materialization_settings.online_enabled = True

fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)
print(fs_poller.result())

2. Backfill the data to both the online and offline materialization store to ensure that both have the latest data. Note that recurrent materialization job, which was setup earlier in the tutorial 3 of this series, will now materialize data to both online and offline materialization stores.

In [None]:
# Trigger backfill on the "transactions" feature set to fill in the online/offline store.
# Backfill from 01/01/2020 to all the way to 3 hours ago.

from datetime import datetime, timedelta
from azure.ai.ml.entities import DataAvailabilityStatus

st = datetime(2020, 1, 1, 0, 0, 0, 0)
et = datetime.now() - timedelta(hours=3)


poller = fs_client.feature_sets.begin_backfill(
    name="transactions",
    version="1",
    feature_window_start_time=st,
    feature_window_end_time=et,
    data_status=[DataAvailabilityStatus.NONE],
)
print(poller.result().job_ids)

The following code cell tracks completion of the backfill job. Using the premium tier Azure Cache for Redis provisioned earlier, this step may take approximately 5 minutes to complete.

In [None]:
# Get the job URL, and stream the job logs.
# With PREMIUM Redis SKU, SKU family "P", and cache capacity 2,
# it takes approximately 5 minutes to complete.
fs_client.jobs.stream(poller.result().job_ids[0])

# Step 4: Test locally
In this step we will use our development environment (i.e. this notebook) to lookup features from online materialization store. 

First, we will parse the list of features from the existing feature retrieval specification:

In [None]:
# Parse the list of features from the existing feature retrieval specification.
feature_retrieval_spec_folder = root_dir + "/project/fraud_model/feature_retrieval_spec"

features = featurestore.resolve_feature_retrieval_spec(feature_retrieval_spec_folder)

features

Next, we will get feature values from the online materialization store:

In [None]:
from azureml.featurestore import init_online_lookup
import time

# Initialize the online store client.
init_online_lookup(features, AzureMLOnBehalfOfCredential())

Now, we will prepare some observation data for testing and use it to lookup features from the online materialization store. During online lookup, it may happen that the keys (`accountID`) defined in the observation sample data do not exist in the Redis (due to `TTL`). If this happens:
1. Open Azure portal.
2. Navigate to the Redis instance. 
3. Open console for the Redis instance and check for existing keys using command `KEYS *`.
4. Replace `accountID` values in the sample observation data with the existing keys.

In [None]:
import pyarrow
from azureml.featurestore import get_online_features

# Prepare test observation data
obs = pyarrow.Table.from_pydict(
    {"accountID": ["A985156952816816", "A1055521248929430", "A914800935560176"]}
)

# Online lookup:
# It may happen that the keys defined in the observation sample data above does not exist in the Redis (due to TTL).
# If this happens, go to Azure portal and navigate to the Redis instance, open its console and check for existing keys using command "KEYS *"
# and replace the sample observation data with the existing keys.
df = get_online_features(features, obs)
df

Now that we have successfully looked up features from the online store, we will test online features using Azure Machine Learning managed online endpoint.

# Step 5: Test online features from Azure Machine Learning managed online endpoint
Managed online endpoint provide capability for deploying and scoring models for online/realtime inference. Optionally, you can use any inference technology of your choice (like kubernetes).

As a part of this step, we will perform the following actions:

1. Create an Azure Machine Learning managed online endpoint.
1. Grant required role-based access control (RBAC) permissions.
1. Deploy the model that we trained in the tutorial 3 of this tutorial series. The scoring script used in this step will have the code to lookup online features.
2. Perform scoring of the model with sample data. You will see that the online features are looked up and model scoring is completed successfully.

## Create Azure Machine Learning managed online endpoint
You can learn more about managed online endpoints [here](https://learn.microsoft.com/azure/machine-learning/how-to-deploy-online-endpoints?view=azureml-api-2&tabs=azure-cli). Note that using managed feature store API, you can also lookup online features from other inference platforms based on your need.

Following code defines a managed online endpoint with name `fraud-model`.

In [None]:
from azure.ai.ml.entities import (
    ManagedOnlineDeployment,
    ManagedOnlineEndpoint,
    Model,
    CodeConfiguration,
    Environment,
)


endpoint_name = "<ENDPOINT_NAME>"

endpoint = ManagedOnlineEndpoint(name=endpoint_name, auth_mode="key")

Excute the following code cell to create the managed online endpoint defined in the previous code cell.

In [None]:
ws_client.online_endpoints.begin_create_or_update(endpoint).result()

## Grant required RBAC permissions
In this step, we will grant required RBAC permissions to the managed online endpoint on the Redis instance and feature store. The scoring code in the model deployment will need these RBAC permissions to successfully lookup features from the online store using the managed feature store API.

### Get managed identity of the managed online endpoint

In [None]:
# Get managed identity of the managed online endpoint.
endpoint = ws_client.online_endpoints.get(endpoint_name)

model_endpoint_msi_principal_id = endpoint.identity.principal_id
model_endpoint_msi_principal_id

### Grant `Contributor` role to the online endpoint managed identity on the Azure Cache for Redis 
We will grant `Contributor` role to the online endpoint managed identity on the Redis instance. This RBAC permission is needed to materialize data into the Redis online store.

In [None]:
from azure.core.exceptions import ResourceExistsError
from azure.mgmt.msi import ManagedServiceIdentityClient
from azure.mgmt.msi.models import Identity
from azure.mgmt.authorization import AuthorizationManagementClient
from azure.mgmt.authorization.models import RoleAssignmentCreateParameters
from uuid import uuid4

auth_client = AuthorizationManagementClient(
    AzureMLOnBehalfOfCredential(), redis_subscription_id
)

scope = f"/subscriptions/{redis_subscription_id}/resourceGroups/{redis_resource_group_name}/providers/Microsoft.Cache/Redis/{redis_name}"


# The role definition ID for the "contributor" role on the redis cache
# You can find other built-in role definition IDs in the Azure documentation
role_definition_id = f"/subscriptions/{redis_subscription_id}/providers/Microsoft.Authorization/roleDefinitions/b24988ac-6180-42a0-ab88-20f7382dd24c"

# Generate a random UUID for the role assignment name
role_assignment_name = str(uuid4())

# Set up the role assignment creation parameters
role_assignment_params = RoleAssignmentCreateParameters(
    principal_id=model_endpoint_msi_principal_id,
    role_definition_id=role_definition_id,
    principal_type="ServicePrincipal",
)

# Create the role assignment
try:
    # Create the role assignment
    result = auth_client.role_assignments.create(
        scope, role_assignment_name, role_assignment_params
    )
    print(
        f"Redis RBAC granted to managed identity '{model_endpoint_msi_principal_id}'."
    )
except ResourceExistsError:
    print(
        f"Redis RBAC already exists for managed identity '{model_endpoint_msi_principal_id}'."
    )

### Grant `AzureML Data Scientist` role to the online endpoint managed identity on the feature store
We will grant `AzureML Data Scientist` role to the online endpoint managed identity on the feature store. This RBAC permission is required for successful deployment of the model to the online endpoint.

In [None]:
auth_client = AuthorizationManagementClient(
    AzureMLOnBehalfOfCredential(), featurestore_subscription_id
)

scope = f"/subscriptions/{featurestore_subscription_id}/resourceGroups/{featurestore_resource_group_name}/providers/Microsoft.MachineLearningServices/workspaces/{featurestore_name}"

# The role definition ID for the "AzureML Data Scientist" role.
# You can find other built-in role definition IDs in the Azure documentation.
role_definition_id = f"/subscriptions/{featurestore_subscription_id}/providers/Microsoft.Authorization/roleDefinitions/f6c7c914-8db3-469d-8ca1-694a8f32e121"

# Generate a random UUID for the role assignment name.
role_assignment_name = str(uuid4())

# Set up the role assignment creation parameters.
role_assignment_params = RoleAssignmentCreateParameters(
    principal_id=model_endpoint_msi_principal_id,
    role_definition_id=role_definition_id,
    principal_type="ServicePrincipal",
)

# Create the role assignment
try:
    # Create the role assignment
    result = auth_client.role_assignments.create(
        scope, role_assignment_name, role_assignment_params
    )
    print(
        f"Feature store RBAC granted to managed identity '{model_endpoint_msi_principal_id}'."
    )
except ResourceExistsError:
    print(
        f"Feature store RBAC already exists for managed identity '{model_endpoint_msi_principal_id}'."
    )

### Deploy the model to the online endpoint
First, inspect the scoring script `project/fraud_model/online_inference/src/scoring.py`. The scoring script performs the following tasks:

1. Load the feature metadata from the feature retrieval specification that was packaged along with the model during model training (tutorial 3 of this tutorial series). This specification has features from both `transactions` and `accounts` feature sets.
2. When an input inference request is received, the scoring code looks up the online features using the index keys from the request. In this case for both feature sets, the index column is the `accountID`.
3. Passes the features to the model to perform inference and returs the response, a boolean value representing the variable `is_fraud`.

First, create managed online deployment definition for model deployment by executing the following code cell:

In [None]:
deployment = ManagedOnlineDeployment(
    name="green",
    endpoint_name=endpoint_name,
    model="azureml:fraud_model:1",
    code_configuration=CodeConfiguration(
        code=root_dir + "/project/fraud_model/online_inference/src/",
        scoring_script="scoring.py",
    ),
    environment=Environment(
        conda_file=root_dir + "/project/fraud_model/online_inference/conda.yml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
    instance_type="Standard_DS3_v2",
    instance_count=1,
)

Next, deploy the model to online enpoint by executing the following code cell. Note that it may take approximately 4-5 minutes to deploy the model.

In [None]:
# Model deployment to online enpoint may take 4-5 minutes.
ws_client.online_deployments.begin_create_or_update(deployment).result()

### Test online deployment with mock data
Finally, execute the following code to test the online deployment using the mock data. You should see `0` or `1` as the output of this cell.

In [None]:
# Test the online deployment using the mock data.
sample_data = root_dir + "/project/fraud_model/online_inference/test.json"
ws_client.online_endpoints.invoke(
    endpoint_name=endpoint_name, request_file=sample_data, deployment_name="green"
)

## Cleanup

Tutorial "5. Develop a feature set with custom source" has instructions for deleting the resources.