sdk/python/jobs/automl-standalone-jobs/automl-forecasting-forecast-function/auto-ml-forecasting-function-gap-batch-inference.ipynb (716 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction\n",
"This notebook demonstrates the full interface of the `forecast()` function. \n",
"\n",
"The best known and most frequent usage of `forecast` enables forecasting on test sets that immediately follows training data. \n",
"\n",
"However, in many use cases it is necessary to continue using the model for some time before retraining it. This happens especially in **high frequency forecasting** when forecasts need to be made more frequently than the model can be retrained. Examples are in Internet of Things and predictive cloud resource scaling.\n",
"\n",
"Here we show how to use the `forecast()` function when a time gap exists between training data and prediction period.\n",
"\n",
"Terminology:\n",
"* forecast origin: the last period when the target value is known\n",
"* forecast periods(s): the period(s) for which the value of the target is desired.\n",
"* lookback: how many past periods (before forecast origin) the model function depends on. The larger of number of lags and length of rolling window.\n",
"* prediction context: `lookback` periods immediately preceding the forecast origin"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"TIME_COLUMN_NAME = \"date\"\n",
"TIME_SERIES_ID_COLUMN_NAME = \"time_series_id\"\n",
"TARGET_COLUMN_NAME = \"y\"\n",
"lags = [1, 2, 3]\n",
"forecast_horizon = 6"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Batch Deployment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import mlflow\n",
"import mlflow.sklearn\n",
"import pandas as pd\n",
"\n",
"from azure.identity import DefaultAzureCredential\n",
"from azure.ai.ml import MLClient\n",
"\n",
"credential = DefaultAzureCredential()\n",
"ml_client = None\n",
"\n",
"subscription_id = \"<SUBSCRIPTION_ID>\"\n",
"resource_group = \"<RESOURCE_GROUP>\"\n",
"workspace = \"<AML_WORKSPACE_NAME>\"\n",
"\n",
"ml_client = MLClient(credential, subscription_id, resource_group, workspace)\n",
"\n",
"# Obtain the tracking URL from MLClient\n",
"MLFLOW_TRACKING_URI = ml_client.workspaces.get(\n",
" name=ml_client.workspace_name\n",
").mlflow_tracking_uri\n",
"\n",
"print(MLFLOW_TRACKING_URI)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from mlflow.tracking.client import MlflowClient\n",
"\n",
"# Set the MLFLOW TRACKING URI\n",
"mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)\n",
"print(\"\\nCurrent tracking uri: {}\".format(mlflow.get_tracking_uri()))\n",
"\n",
"# Initialize MLFlow client\n",
"mlflow_client = MlflowClient()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# job_name = returned_job.name # If training job is in the same notebook\n",
"job_name = \"yellow_camera_1n84g0vcwp\" ## Example of providing an specific Job name/ID\n",
"\n",
"# Get the parent run\n",
"mlflow_parent_run = mlflow_client.get_run(job_name)\n",
"\n",
"# print(\"Parent Run: \")\n",
"# print(mlflow_parent_run)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get the best model's child run\n",
"best_child_run_id = mlflow_parent_run.data.tags[\"automl_best_child_run_id\"]\n",
"print(\"Found best child run id: \", best_child_run_id)\n",
"\n",
"best_run = mlflow_client.get_run(best_child_run_id)\n",
"\n",
"print(\"Best child run: \")\n",
"print(best_run)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"import datetime\n",
"from azure.ai.ml.entities import (\n",
" Environment,\n",
" BatchEndpoint,\n",
" BatchDeployment,\n",
" BatchRetrySettings,\n",
" Model,\n",
")\n",
"from azure.ai.ml.constants import BatchDeploymentOutputAction\n",
"\n",
"model_name = \"test-batch-endpoint\"\n",
"batch_endpoint_name = \"gap-batch-\" + datetime.datetime.now().strftime(\"%m%d%H%M%f\")\n",
"\n",
"model = Model(\n",
" path=f\"azureml://jobs/{best_run.info.run_id}/outputs/artifacts/outputs/model.pkl\",\n",
" name=model_name,\n",
" description=\"Gap prediction sample best model\",\n",
")\n",
"registered_model = ml_client.models.create_or_update(model)\n",
"\n",
"env = Environment(\n",
" name=\"automl-tabular-env\",\n",
" description=\"environment for automl inference\",\n",
" image=\"mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest\",\n",
" conda_file=\"artifact_downloads/outputs/conda_env_v_1_0_0.yml\",\n",
")\n",
"\n",
"endpoint = BatchEndpoint(\n",
" name=batch_endpoint_name,\n",
" description=\"this is a sample batch endpoint\",\n",
")\n",
"ml_client.begin_create_or_update(endpoint).wait()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from azure.core.exceptions import ResourceNotFoundError\n",
"from azure.ai.ml.entities import AmlCompute\n",
"\n",
"cluster_name = \"gap-cluster\"\n",
"\n",
"try:\n",
" # Retrieve an already attached Azure Machine Learning Compute.\n",
" compute = ml_client.compute.get(cluster_name)\n",
"except ResourceNotFoundError as e:\n",
" compute = AmlCompute(\n",
" name=cluster_name,\n",
" size=\"STANDARD_DS12_V2\",\n",
" type=\"amlcompute\",\n",
" min_instances=0,\n",
" max_instances=4,\n",
" idle_time_before_scale_down=120,\n",
" )\n",
" poller = ml_client.begin_create_or_update(compute)\n",
" poller.wait()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"batch_endpoint_name"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"output_file = \"forecast.csv\" # Where the predictions would be stored\n",
"batch_deployment = BatchDeployment(\n",
" name=\"non-mlflow-deployment\",\n",
" description=\"this is a sample non-mlflow deployment\",\n",
" endpoint_name=batch_endpoint_name,\n",
" model=registered_model,\n",
" code_path=\"./forecasting_script\",\n",
" scoring_script=\"forecasting_script.py\",\n",
" environment=env,\n",
" environment_variables={\n",
" \"TARGET_COLUMN_NAME\": TARGET_COLUMN_NAME,\n",
" },\n",
" compute=cluster_name,\n",
" instance_count=1, # 2\n",
" max_concurrency_per_instance=1, # 2\n",
" mini_batch_size=1, # 10\n",
" output_action=BatchDeploymentOutputAction.APPEND_ROW,\n",
" output_file_name=output_file,\n",
" retry_settings=BatchRetrySettings(max_retries=3, timeout=30),\n",
" logging_level=\"info\",\n",
" properties={\"include_output_header\": \"true\"},\n",
" tags={\"include_output_header\": \"true\"},\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"ml_client.begin_create_or_update(batch_deployment).wait()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"batch_endpoint_name"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data visualization of the train and test data"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"# We stored the training and test data during training\n",
"df_train = pd.read_parquet(\"./data/training-mltable-folder/df_train.parquet\")\n",
"df_test = pd.read_parquet(\"./data/testing-mltable-folder/df_test.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_train.tail(2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_test.head(2)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"# The above test data follows the training data\n",
"# Store in folder for the batch endpoint to use as parquet file\n",
"df_test.to_parquet(\"./data/test_data_scenarios/df_test_scenario1.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"# Concatenate the training and testing DataFrames\n",
"df_plot = pd.concat([df_train, df_test])\n",
"\n",
"# Create a figure and axis\n",
"plt.figure(figsize=(10, 6))\n",
"ax = plt.gca() # Get current axis\n",
"\n",
"# Group by both 'data_type' and 'time_series_id'\n",
"for (data_type, time_series_id), df in df_plot.groupby(\n",
" [\"data_type\", TIME_SERIES_ID_COLUMN_NAME]\n",
"):\n",
" df.plot(\n",
" x=\"date\",\n",
" y=TARGET_COLUMN_NAME,\n",
" label=f\"{data_type} - {time_series_id}\",\n",
" ax=ax,\n",
" legend=False,\n",
" )\n",
"\n",
"# Customize the plot\n",
"plt.xlabel(\"Date\")\n",
"plt.ylabel(\"Value\")\n",
"plt.title(\"Train and Test Data\")\n",
"\n",
"# Manually create the legend after plotting\n",
"plt.legend(title=\"Data Type and Time Series ID\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Forecasting from the trained model\n",
"\n",
"In this section we will review the forecast interface for two main scenarios: forecasting right after the training data, and the more complex interface for forecasting when there is a gap (in the time sense) between training and testing data.\n",
"\n",
"## X_train is directly followed by the X_test\n",
"Let's first consider the case when the prediction period immediately follows the training data. This is typical in scenarios where we have the time to retrain the model every time we wish to forecast. Forecasts that are made on daily and slower cadence typically fall into this category. Retraining the model every time benefits the accuracy because the most recent data is often the most informative.\n",
"\n",
"\n",
"<img src=\"./images/forecast_function_at_train.png\" alt=\"Description\" width=\"50%\">\n",
"\n",
"We use X_test as a forecast request to generate the predictions."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get the test data for which we need the prediction"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"from azure.ai.ml import Input\n",
"from azure.ai.ml.constants import AssetTypes\n",
"\n",
"my_test_data_input = Input(\n",
" type=AssetTypes.URI_FOLDER,\n",
" path=\"./data/test_data_scenarios\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"my_test_data_input"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Invoke the endpoint with the test data"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"job = ml_client.batch_endpoints.invoke(\n",
" endpoint_name=batch_endpoint_name,\n",
" input=my_test_data_input, # Test data input\n",
" deployment_name=\"non-mlflow-deployment\", # name is required as default deployment is not set\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"job_name = job.name\n",
"batch_job = ml_client.jobs.get(name=job_name)\n",
"print(batch_job.status)\n",
"# stream the job logs\n",
"ml_client.jobs.stream(name=job_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"job_name, \" \", output_file"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get the predictions\n",
"download_path = \"./outputs/\"\n",
"ml_client.jobs.download(job_name, download_path=download_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fcst_df = pd.read_csv(download_path + output_file, parse_dates=[TIME_COLUMN_NAME])\n",
"fcst_df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Forecasting away from training data\n",
"Suppose we trained a model, some time passed, and now we want to apply the model without re-training. If the model \"looks back\" -- uses previous values of the target -- then we somehow need to provide those values to the model.\n",
"\n",
"<img src=\"./images/forecast_function_away_from_train.png\" alt=\"Description\" width=\"50%\">\n",
"\n",
"The notion of forecast origin comes into play: **the forecast origin is the last period for which we have seen the target value.** This applies per time-series, so each time-series can have a different forecast origin.\n",
"\n",
"The part of data before the forecast origin is the **prediction context**. To provide the context values the model needs when it looks back, we pass definite values in y_test (aligned with corresponding times in X_test)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Generate the same kind of test data we trained on, but now make the train set much longer, so that the test set will be in the future\n",
"from helper import get_timeseries, make_forecasting_query\n",
"\n",
"X_context, y_context, X_away, y_away = get_timeseries(\n",
" train_len=42, # train data was 30 steps long\n",
" test_len=4,\n",
" time_column_name=TIME_COLUMN_NAME,\n",
" target_column_name=TARGET_COLUMN_NAME,\n",
" time_series_id_column_name=TIME_SERIES_ID_COLUMN_NAME,\n",
" time_series_number=2,\n",
")\n",
"\n",
"print(\"End of the data we trained on:\")\n",
"print(df_train.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].max())\n",
"\n",
"print(\"\\nStart of the data we want to predict on:\")\n",
"print(X_away.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].min())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There is a gap of 12 hours between end of training and beginning of X_away. (It looks like 13 because all timestamps point to the start of the one hour periods.) Using only X_away will fail without adding context data for the model to consume"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"x_gap_test = X_away.copy()\n",
"x_gap_test[\"y\"] = y_away\n",
"x_gap_test[\"data_type\"] = \"test\" # Dummy data\n",
"\n",
"x_gap_test.to_csv(\"./data/test_gap_scenario/gap_test_data.csv\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Since the length of the lookback is 3, we need to add 3 periods from the context to the request so that the model has the data it needs\n",
"\n",
"# Put the X and y back together for a while. They like each other and it makes them happy.\n",
"X_context[TARGET_COLUMN_NAME] = y_context\n",
"X_away[TARGET_COLUMN_NAME] = y_away\n",
"fulldata = pd.concat([X_context, X_away])\n",
"\n",
"# Forecast origin is the last point of data, which is one 1-hr period before test\n",
"forecast_origin = X_away[TIME_COLUMN_NAME].min() - pd.DateOffset(hours=1)\n",
"# it is indeed the last point of the context\n",
"assert forecast_origin == X_context[TIME_COLUMN_NAME].max()\n",
"print(\"Forecast origin: \" + str(forecast_origin))\n",
"\n",
"# The model uses lags and rolling windows to look back in time\n",
"n_lookback_periods = max(\n",
" lags\n",
") # n_lookback_periods = max(max(lags), forecast_horizon) # If target_rolling_window_size is used\n",
"lookback = pd.DateOffset(hours=n_lookback_periods)\n",
"horizon = pd.DateOffset(hours=forecast_horizon)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Now make the forecast query from context. This is the main thing for predicting gap data\n",
"from helper import make_forecasting_query\n",
"\n",
"X_pred, y_pred = make_forecasting_query(\n",
" fulldata, TIME_COLUMN_NAME, TARGET_COLUMN_NAME, forecast_origin, horizon, lookback\n",
")\n",
"\n",
"# show the forecast request aligned\n",
"X_show = X_pred.copy()\n",
"X_show[TARGET_COLUMN_NAME] = y_pred\n",
"X_show[X_show[\"time_series_id\"] == \"ts0\"]"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [],
"source": [
"X_pred[\n",
" \"data_type\"\n",
"] = \"unknown\" # Our trining had an additional column called data_type, hence, adding it"
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {},
"outputs": [],
"source": [
"gap_data = X_pred.copy()\n",
"gap_data[TARGET_COLUMN_NAME] = y_pred\n",
"gap_data.to_csv(\"./data/test_gap_scenario/gap_data_with_context.csv\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"y_pred"
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [],
"source": [
"my_test_data_gap_input = Input(\n",
" type=AssetTypes.URI_FOLDER,\n",
" path=\"./data/test_gap_scenario/\", # Path to the data folder that has the test data with gap\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [],
"source": [
"gap_job = ml_client.batch_endpoints.invoke(\n",
" endpoint_name=batch_endpoint_name,\n",
" input=my_test_data_gap_input, # Test data input\n",
" deployment_name=\"non-mlflow-deployment\", # name is required as default deployment is not set\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"job_name = gap_job.name\n",
"batch_job = ml_client.jobs.get(name=job_name)\n",
"print(batch_job.status)\n",
"# stream the job logs\n",
"ml_client.jobs.stream(name=job_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get the predictions\n",
"gap_download_path = \"./outputs/gap_scenario/\"\n",
"ml_client.jobs.download(job_name, download_path=gap_download_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"gap_fcst_df = pd.read_csv(\n",
" gap_download_path + output_file, parse_dates=[TIME_COLUMN_NAME]\n",
")\n",
"gap_fcst_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"gap_data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# show the forecast aligned without the generated features\n",
"X_show = gap_fcst_df[gap_fcst_df[\"data_type\"] == \"test\"]\n",
"X_show"
]
}
],
"metadata": {
"authors": [
{
"name": "jialiu"
}
],
"category": "tutorial",
"compute": [
"Remote"
],
"datasets": [
"None"
],
"deployment": [
"None"
],
"exclude_from_index": false,
"framework": [
"Azure ML AutoML"
],
"friendly_name": "Forecasting away from training data",
"index_order": 3,
"kernelspec": {
"display_name": "sdkv2-test1",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.20"
},
"microsoft": {
"ms_spell_check": {
"ms_spell_check_language": "en"
}
},
"nteract": {
"version": "nteract-front-end@1.0.0"
},
"tags": [
"Forecasting",
"Confidence Intervals"
],
"task": "Forecasting"
},
"nbformat": 4,
"nbformat_minor": 4
}