# Analysis of Synthetic Data

This notebook demonstrates a hypothetical scenario of how likely a programmer should be given access to a GPT2 model for inferencing, based on information such as their favorite programming language, preference for tabs vs spaces, OS, location and so forth. Each programmer will be given a score between [0,10] where a score between [7,10] indicates access given to the programmer and [0,7) indicates access denied. The data were synthetically generated via the [PyPI package, Fibber.io](https://pypi.org/project/fibber/).

First, we need to specify the version of the RAI components which are available in the workspace. This was specified when the components were uploaded, and will have defaulted to '1':

In [None]:
version_string = "1"

We also need to give the name of the compute cluster we want to use in AzureML. Later in this notebook, we will create it if it does not already exist:

In [None]:
compute_name = "rai-cluster"

Finally, we need to specify a version for the data and components we will create while running this notebook. This should be unique for the workspace, but the specific value doesn't matter:

In [None]:
rai_programmer_example_version_string = "1"

## Configure workspace details and get a handle to the workspace

To connect to a workspace, we need identifier parameters - a subscription, resource group and workspace name. We will use these details in the MLClient from `azure.ai.ml` to get a handle to the required Azure Machine Learning workspace.

In [None]:
# Enter details of your AML workspace
subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"

In [None]:
# Handle to the workspace
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
ml_client = MLClient(
    credential=credential,
    subscription_id=subscription_id,
    resource_group_name=resource_group,
    workspace_name=workspace,
)
print(ml_client)

In [None]:
# Get handle to azureml registry for the RAI built in components
registry_name = "azureml"
ml_client_registry = MLClient(
    credential=credential,
    subscription_id=subscription_id,
    resource_group_name=resource_group,
    registry_name=registry_name,
)
print(ml_client_registry)

## Accessing the Data

We supply the synthetic data as a pair of parquet files and accompanying `MLTable` file. We can read them in and take a brief look:

In [None]:
import os
import pandas as pd

Now define the paths to the data:

In [None]:
train_data_path = "data-programmer-regression/train/"

In [None]:
test_data_path = "data-programmer-regression/test/"

Load some data for a quick view:

In [None]:
import mltable

tbl = mltable.load(train_data_path)
train_df: pd.DataFrame = tbl.to_pandas_dataframe()

# test dataset should have less than 5000 rows
test_df = mltable.load(test_data_path).to_pandas_dataframe()
assert len(test_df.index) <= 5000

display(train_df)

The (synthetic) data are about a collection of programmers, with a 'score' column which we wish to predict:

In [None]:
target_column_name = "score"

First, we need to upload the datasets to our workspace.

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

input_train_data = "Programmers_Train_MLTable"
input_test_data = "Programmers_Test_MLTable"

try:
    # Try getting data already registered in workspace
    train_data = ml_client.data.get(
        name=input_train_data, version=rai_programmer_example_version_string
    )
    test_data = ml_client.data.get(
        name=input_test_data, version=rai_programmer_example_version_string
    )
except Exception as e:
    # If no data of specified version exist, create new one
    train_data = Data(
        path=train_data_path,
        type=AssetTypes.MLTABLE,
        description="RAI programmers training data",
        name=input_train_data,
        version=rai_programmer_example_version_string,
    )
    ml_client.data.create_or_update(train_data)

    test_data = Data(
        path=test_data_path,
        type=AssetTypes.MLTABLE,
        description="RAI programmers test data",
        name=input_test_data,
        version=rai_programmer_example_version_string,
    )
    ml_client.data.create_or_update(test_data)

# Creating the Model

To simplify the model creation process, we're going to use a pipeline.

We create a directory for the training script:

In [None]:
import os

os.makedirs("register_model_src", exist_ok=True)
os.makedirs("programmer_component_src", exist_ok=True)

Next, we write out our training script:

In [None]:
%%writefile programmer_component_src/training_script_reg.py

import argparse
import os
import shutil
import tempfile


from azureml.core import Run

import mlflow
import mlflow.sklearn

import mltable

import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", type=str, help="Path to training data")
    parser.add_argument("--target_column_name", type=str, help="Name of target column")
    parser.add_argument("--model_output", type=str, help="Path of output model")

    # parse args
    args = parser.parse_args()

    # return args
    return args

def create_regression_pipeline(X, y):
    pipe_cfg = {
        'num_cols': X.dtypes[X.dtypes == 'int64'].index.values.tolist(),
        'cat_cols': X.dtypes[X.dtypes == 'object'].index.values.tolist(),
    }
    num_pipe = Pipeline([
        ('num_imputer', SimpleImputer(strategy='median')),
        ('num_scaler', StandardScaler())
    ])
    cat_pipe = Pipeline([
        ('cat_imputer', SimpleImputer(strategy='constant', fill_value='?')),
        ('cat_encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    feat_pipe = ColumnTransformer([
        ('num_pipe', num_pipe, pipe_cfg['num_cols']),
        ('cat_pipe', cat_pipe, pipe_cfg['cat_cols'])
    ])

    # Append classifier to preprocessing pipeline.
    # Now we have a full prediction pipeline.
    pipeline = Pipeline(steps=[('preprocessor', feat_pipe),
                               ('model', LinearRegression())])
    return pipeline.fit(X, y)

def main(args):
    current_experiment = Run.get_context().experiment
    tracking_uri = current_experiment.workspace.get_mlflow_tracking_uri()
    print("tracking_uri: {0}".format(tracking_uri))
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(current_experiment.name)
    
    # Read in data
    print("Reading data")
    tbl = mltable.load(args.training_data)
    all_data = tbl.to_pandas_dataframe()

    print("Extracting X_train, y_train")
    print("all_data cols: {0}".format(all_data.columns))
    y_train = all_data[args.target_column_name]
    X_train = all_data.drop(labels=args.target_column_name, axis="columns")
    print("X_train cols: {0}".format(X_train.columns))

    print("Training model")
    # The estimator can be changed to suit
    model = create_regression_pipeline(X_train, y_train)

    # Saving model with mlflow - leave this section unchanged
    with tempfile.TemporaryDirectory() as td:
        print("Saving model with MLFlow to temporary directory")
        tmp_output_dir = os.path.join(td, "my_model_dir")
        mlflow.sklearn.save_model(sk_model=model, path=tmp_output_dir)

        print("Copying MLFlow model to output path")
        for file_name in os.listdir(tmp_output_dir):
            print("  Copying: ", file_name)
            # As of Python 3.8, copytree will acquire dirs_exist_ok as
            # an option, removing the need for listdir
            shutil.copy2(src=os.path.join(tmp_output_dir, file_name), dst=os.path.join(args.model_output, file_name))


# run script
if __name__ == "__main__":
    # add space in logs
    print("*" * 60)
    print("\n\n")

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

In [None]:
%%writefile register_model_src/register.py

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import argparse
import json
import os
import time


from azureml.core import Run

import mlflow
import mlflow.sklearn

# Based on example:
# https://docs.microsoft.com/en-us/azure/machine-learning/how-to-train-cli
# which references
# https://github.com/Azure/azureml-examples/tree/main/cli/jobs/train/lightgbm/iris


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--model_input_path", type=str, help="Path to input model")
    parser.add_argument(
        "--model_info_output_path", type=str, help="Path to write model info JSON"
    )
    parser.add_argument(
        "--model_base_name", type=str, help="Name of the registered model"
    )
    parser.add_argument(
        "--model_name_suffix", type=int, help="Set negative to use epoch_secs"
    )

    # parse args
    args = parser.parse_args()

    # return args
    return args


def main(args):
    current_experiment = Run.get_context().experiment
    tracking_uri = current_experiment.workspace.get_mlflow_tracking_uri()
    print("tracking_uri: {0}".format(tracking_uri))
    mlflow.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(current_experiment.name)

    print("Loading model")
    mlflow_model = mlflow.sklearn.load_model(args.model_input_path)

    if args.model_name_suffix < 0:
        suffix = int(time.time())
    else:
        suffix = args.model_name_suffix
    registered_name = "{0}_{1}".format(args.model_base_name, suffix)
    print(f"Registering model as {registered_name}")

    print("Registering via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=mlflow_model,
        registered_model_name=registered_name,
        artifact_path=registered_name,
    )

    print("Writing JSON")
    dict = {"id": "{0}:1".format(registered_name)}
    output_path = os.path.join(args.model_info_output_path, "model_info.json")
    with open(output_path, "w") as of:
        json.dump(dict, fp=of)


# run script
if __name__ == "__main__":
    # add space in logs
    print("*" * 60)
    print("\n\n")

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")


Now, we can build this into an AzureML component:

In [None]:
from azure.ai.ml import load_component

yaml_contents = f"""
$schema: http://azureml/sdk-2-0/CommandComponent.json
name: rai_programmers_training_component
display_name: Programmers training component for RAI example
version: {rai_programmer_example_version_string}
type: command
inputs:
  training_data:
    type: path
  target_column_name:
    type: string
outputs:
  model_output:
    type: path
code: ./programmer_component_src/
environment: azureml://registries/azureml/environments/responsibleai-tabular/versions/14
command: >-
  python training_script_reg.py
  --training_data ${{{{inputs.training_data}}}}
  --target_column_name ${{{{inputs.target_column_name}}}}
  --model_output ${{{{outputs.model_output}}}}
"""

yaml_filename = "ProgrammersRegTrainingComp.yaml"

with open(yaml_filename, "w") as f:
    f.write(yaml_contents)

train_model_component = load_component(source=yaml_filename)

In [None]:
yaml_contents = f"""
$schema: http://azureml/sdk-2-0/CommandComponent.json
name: register_model
display_name: Register Model
version: {rai_programmer_example_version_string}
type: command
is_deterministic: False
inputs:
  model_input_path:
    type: path
  model_base_name:
    type: string
  model_name_suffix: # Set negative to use epoch_secs
    type: integer
    default: -1
outputs:
  model_info_output_path:
    type: path
code: ./register_model_src/
environment: azureml://registries/azureml/environments/responsibleai-tabular/versions/14
command: >-
  python register.py
  --model_input_path ${{{{inputs.model_input_path}}}}
  --model_base_name ${{{{inputs.model_base_name}}}}
  --model_name_suffix ${{{{inputs.model_name_suffix}}}}
  --model_info_output_path ${{{{outputs.model_info_output_path}}}}

"""

yaml_filename = "register.yaml"

with open(yaml_filename, "w") as f:
    f.write(yaml_contents)

register_component = load_component(source=yaml_filename)

We need a compute target on which to run our jobs. The following checks whether the compute specified above is present; if not, then the compute target is created.

In [None]:
from azure.ai.ml.entities import AmlCompute

all_compute_names = [x.name for x in ml_client.compute.list()]

if compute_name in all_compute_names:
    print(f"Found existing compute: {compute_name}")
else:
    my_compute = AmlCompute(
        name=compute_name,
        size="Standard_D2_v2",
        min_instances=0,
        max_instances=4,
        idle_time_before_scale_down=3600,
    )
    ml_client.compute.begin_create_or_update(my_compute).result()
    print("Initiated compute creation")

## Running a training pipeline

Now that we have our training component, we can run it. We begin by generating a unique name for the mode;

In [None]:
import time

model_name_suffix = int(time.time())
model_name = "rai_programmer_example_reg"

Next, we define our training pipeline. This has two components. The first is the training component which we defined above. The second is a component to register the model in AzureML:

In [None]:
from azure.ai.ml import dsl, Input

programmers_train_mltable = Input(
    type="mltable",
    path=f"azureml:{input_train_data}:{rai_programmer_example_version_string}",
    mode="download",
)
programmers_test_mltable = Input(
    type="mltable",
    path=f"azureml:{input_test_data}:{rai_programmer_example_version_string}",
    mode="download",
)


@dsl.pipeline(
    compute=compute_name,
    description="Register Model for RAI Programmers example",
    experiment_name=f"RAI_Programmers_Example_Model_Training_{model_name_suffix}",
)
def my_training_pipeline(target_column_name, training_data):
    trained_model = train_model_component(
        target_column_name=target_column_name, training_data=training_data
    )
    trained_model.set_limits(timeout=3600)

    _ = register_component(
        model_input_path=trained_model.outputs.model_output,
        model_base_name=model_name,
        model_name_suffix=model_name_suffix,
    )

    return {}


model_registration_pipeline_job = my_training_pipeline(
    target_column_name, programmers_train_mltable
)

With the training pipeline defined, we can submit it for execution in AzureML. We define a helper function to wait for the job to complete:

In [None]:
from azure.ai.ml.entities import PipelineJob
from IPython.core.display import HTML
from IPython.display import display


def submit_and_wait(ml_client, pipeline_job) -> PipelineJob:
    created_job = ml_client.jobs.create_or_update(pipeline_job)
    assert created_job is not None

    print("Pipeline job can be accessed in the following URL:")
    display(HTML('<a href="{0}">{0}</a>'.format(created_job.studio_url)))

    while created_job.status not in [
        "Completed",
        "Failed",
        "Canceled",
        "NotResponding",
    ]:
        time.sleep(30)
        created_job = ml_client.jobs.get(created_job.name)
        print("Latest status : {0}".format(created_job.status))
    assert created_job.status == "Completed"
    return created_job


# This is the actual submission
training_job = submit_and_wait(ml_client, model_registration_pipeline_job)

## Creating the RAI Insights

Now that we have our model, we can generate RAI insights for it. We will need the `id` of the registered model, which will be as follows:

In [None]:
expected_model_id = f"{model_name}_{model_name_suffix}:1"
azureml_model_id = f"azureml:{expected_model_id}"

Next, we load the RAI components, so that we can construct a pipeline:

In [None]:
label = "latest"

rai_constructor_component = ml_client_registry.components.get(
    name="rai_tabular_insight_constructor", label=label
)

# We get latest version and use the same version for all components
version = rai_constructor_component.version
print("The current version of RAI built-in components is: " + version)

rai_explanation_component = ml_client_registry.components.get(
    name="rai_tabular_explanation", version=version
)

rai_causal_component = ml_client_registry.components.get(
    name="rai_tabular_causal", version=version
)

rai_counterfactual_component = ml_client_registry.components.get(
    name="rai_tabular_counterfactual", version=version
)

rai_erroranalysis_component = ml_client_registry.components.get(
    name="rai_tabular_erroranalysis", version=version
)

rai_gather_component = ml_client_registry.components.get(
    name="rai_tabular_insight_gather", version=version
)

rai_scorecard_component = ml_client_registry.components.get(
    name="rai_tabular_score_card", version=version
)

## Score card generation config
For score card generation, we need some additional configuration in a separate json file. Here we configure the following model performance metrics for reporting:
- mean absolute error
- mean squared error

In [None]:
import json

score_card_config_dict = {
    "Model": {
        "ModelName": "GPT2 Access",
        "ModelType": "Regression",
        "ModelSummary": "This is a regression model to analyzer how likely a programmer is given access to gpt 2",
    },
    "Metrics": {"mean_absolute_error": {"threshold": "<=20"}, "mean_squared_error": {}},
    "FeatureImportance": {"top_n": 6},
    "DataExplorer": {"features": ["YOE", "age"]},
    "Fairness": {
        "metric": ["mean_squared_error", "mean_absolute_error"],
        "sensitive_features": ["IDE", "style"],
        "fairness_evaluation_kind": "difference",
    },
}

score_card_config_filename = "rai_programmer_regression_score_card_config.json"

with open(score_card_config_filename, "w") as f:
    json.dump(score_card_config_dict, f)

We can now specify our pipeline. Complex objects (such as lists of column names) have to be converted to JSON strings before being passed to the components. Note that the timeout for the counterfactual job is noticeably longer, since generating counterfactual points is a comparatively slow process:

In [None]:
import json
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes

score_card_config_path = Input(
    type="uri_file", path=score_card_config_filename, mode="download"
)

categorical_columns = json.dumps(
    ["location", "style", "job title", "OS", "Employer", "IDE", "Programming language"]
)
treatment_features = json.dumps(["Number of github repos contributed to", "YOE"])
desired_range = json.dumps([5, 10])
filter_columns = json.dumps(["style", "Employer"])


@dsl.pipeline(
    compute=compute_name,
    description="Example RAI computation on programmers data",
    experiment_name=f"RAI_Programmers_Example_RAIInsights_Computation_{model_name_suffix}",
)
def rai_programmer_regression_pipeline(
    target_column_name,
    train_data,
    test_data,
    score_card_config_path,
):
    # Initiate the RAIInsights
    create_rai_job = rai_constructor_component(
        title="RAI Dashboard Example",
        task_type="regression",
        model_info=expected_model_id,
        model_input=Input(type=AssetTypes.MLFLOW_MODEL, path=azureml_model_id),
        train_dataset=train_data,
        test_dataset=test_data,
        target_column_name=target_column_name,
        categorical_column_names=categorical_columns,
        # If your model has extra dependencies, and your Responsible AI job failed to
        # load mlflow model with ValueError, try set use_model_dependency to True.
        # If you have further questions, contact askamlrai@microsoft.com
        use_model_dependency=True,
    )
    create_rai_job.set_limits(timeout=7200)

    # Add an explanation
    explain_job = rai_explanation_component(
        comment="Explanation for the programmers dataset",
        rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
    )
    explain_job.set_limits(timeout=7200)

    # Add causal analysis
    causal_job = rai_causal_component(
        rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        treatment_features=treatment_features,
    )
    causal_job.set_limits(timeout=7200)

    # Add counterfactual analysis
    counterfactual_job = rai_counterfactual_component(
        rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        total_cfs=10,
        desired_range=desired_range,
    )
    counterfactual_job.set_limits(timeout=7200)

    # Add error analysis
    erroranalysis_job = rai_erroranalysis_component(
        rai_insights_dashboard=create_rai_job.outputs.rai_insights_dashboard,
        filter_features=filter_columns,
    )
    erroranalysis_job.set_limits(timeout=7200)

    # Combine everything
    rai_gather_job = rai_gather_component(
        constructor=create_rai_job.outputs.rai_insights_dashboard,
        insight_1=explain_job.outputs.explanation,
        insight_2=causal_job.outputs.causal,
        insight_3=counterfactual_job.outputs.counterfactual,
        insight_4=erroranalysis_job.outputs.error_analysis,
    )
    rai_gather_job.set_limits(timeout=7200)

    rai_gather_job.outputs.dashboard.mode = "upload"
    rai_gather_job.outputs.ux_json.mode = "upload"

    # Generate score card in pdf format for a summary report on model performance,
    # and observe distrbution of error between prediction vs ground truth.
    rai_scorecard_job = rai_scorecard_component(
        dashboard=rai_gather_job.outputs.dashboard,
        pdf_generation_config=score_card_config_path,
    )

    return {
        "dashboard": rai_gather_job.outputs.dashboard,
        "ux_json": rai_gather_job.outputs.ux_json,
        "scorecard": rai_scorecard_job.outputs.scorecard,
    }

Next, we define the pipeline object itself, and ensure that the outputs will be available for download:

In [None]:
import uuid
from azure.ai.ml import Output

insights_pipeline_job = rai_programmer_regression_pipeline(
    target_column_name=target_column_name,
    train_data=programmers_train_mltable,
    test_data=programmers_test_mltable,
    score_card_config_path=score_card_config_path,
)

rand_path = str(uuid.uuid4())
insights_pipeline_job.outputs.dashboard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/dashboard/",
    mode="upload",
    type="uri_folder",
)
insights_pipeline_job.outputs.ux_json = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/ux_json/",
    mode="upload",
    type="uri_folder",
)
insights_pipeline_job.outputs.scorecard = Output(
    path=f"azureml://datastores/workspaceblobstore/paths/{rand_path}/scorecard/",
    mode="upload",
    type="uri_folder",
)

And submit the pipeline to AzureML for execution:

In [None]:
insights_job = submit_and_wait(ml_client, insights_pipeline_job)

The dashboard should appear in the AzureML portal in the registered model view. The following cell computes the expected URI:

In [None]:
sub_id = ml_client._operation_scope.subscription_id
rg_name = ml_client._operation_scope.resource_group_name
ws_name = ml_client.workspace_name

expected_uri = f"https://ml.azure.com/model/{expected_model_id}/model_analysis?wsid=/subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}"

print(f"Please visit {expected_uri} to see your analysis")

## Downloading the Scorecard PDF

We can download the scorecard PDF from our pipeline as follows:

In [None]:
target_directory = "."

ml_client.jobs.download(
    insights_job.name, download_path=target_directory, output_name="scorecard"
)