## Introduction
This notebook demonstrates the full interface of the `forecast()` function. 

The best known and most frequent usage of `forecast` enables forecasting on test sets that immediately follows training data. 

However, in many use cases it is necessary to continue using the model for some time before retraining it. This happens especially in **high frequency forecasting** when forecasts need to be made more frequently than the model can be retrained. Examples are in Internet of Things and predictive cloud resource scaling.

Here we show how to use the `forecast()` function when a time gap exists between training data and prediction period.

Terminology:
* forecast origin: the last period when the target value is known
* forecast periods(s): the period(s) for which the value of the target is desired.
* lookback: how many past periods (before forecast origin) the model function depends on. The larger of number of lags and length of rolling window.
* prediction context: `lookback` periods immediately preceding the forecast origin

In [9]:
TIME_COLUMN_NAME = "date"
TIME_SERIES_ID_COLUMN_NAME = "time_series_id"
TARGET_COLUMN_NAME = "y"
lags = [1, 2, 3]
forecast_horizon = 6

### Batch Deployment

In [None]:
import mlflow
import mlflow.sklearn
import pandas as pd

from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient

credential = DefaultAzureCredential()
ml_client = None

subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"

ml_client = MLClient(credential, subscription_id, resource_group, workspace)

# Obtain the tracking URL from MLClient
MLFLOW_TRACKING_URI = ml_client.workspaces.get(
    name=ml_client.workspace_name
).mlflow_tracking_uri

print(MLFLOW_TRACKING_URI)

In [None]:
from mlflow.tracking.client import MlflowClient

# Set the MLFLOW TRACKING URI
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
print("\nCurrent tracking uri: {}".format(mlflow.get_tracking_uri()))

# Initialize MLFlow client
mlflow_client = MlflowClient()

In [3]:
# job_name = returned_job.name # If training job is in the same notebook
job_name = "yellow_camera_1n84g0vcwp"  ## Example of providing an specific Job name/ID

# Get the parent run
mlflow_parent_run = mlflow_client.get_run(job_name)

# print("Parent Run: ")
# print(mlflow_parent_run)

In [None]:
# Get the best model's child run
best_child_run_id = mlflow_parent_run.data.tags["automl_best_child_run_id"]
print("Found best child run id: ", best_child_run_id)

best_run = mlflow_client.get_run(best_child_run_id)

print("Best child run: ")
print(best_run)

In [5]:
import datetime
from azure.ai.ml.entities import (
    Environment,
    BatchEndpoint,
    BatchDeployment,
    BatchRetrySettings,
    Model,
)
from azure.ai.ml.constants import BatchDeploymentOutputAction

model_name = "test-batch-endpoint"
batch_endpoint_name = "gap-batch-" + datetime.datetime.now().strftime("%m%d%H%M%f")

model = Model(
    path=f"azureml://jobs/{best_run.info.run_id}/outputs/artifacts/outputs/model.pkl",
    name=model_name,
    description="Gap prediction sample best model",
)
registered_model = ml_client.models.create_or_update(model)

env = Environment(
    name="automl-tabular-env",
    description="environment for automl inference",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    conda_file="artifact_downloads/outputs/conda_env_v_1_0_0.yml",
)

endpoint = BatchEndpoint(
    name=batch_endpoint_name,
    description="this is a sample batch endpoint",
)
ml_client.begin_create_or_update(endpoint).wait()

In [6]:
from azure.core.exceptions import ResourceNotFoundError
from azure.ai.ml.entities import AmlCompute

cluster_name = "gap-cluster"

try:
    # Retrieve an already attached Azure Machine Learning Compute.
    compute = ml_client.compute.get(cluster_name)
except ResourceNotFoundError as e:
    compute = AmlCompute(
        name=cluster_name,
        size="STANDARD_DS12_V2",
        type="amlcompute",
        min_instances=0,
        max_instances=4,
        idle_time_before_scale_down=120,
    )
    poller = ml_client.begin_create_or_update(compute)
    poller.wait()

In [None]:
batch_endpoint_name

In [10]:
output_file = "forecast.csv"  # Where the predictions would be stored
batch_deployment = BatchDeployment(
    name="non-mlflow-deployment",
    description="this is a sample non-mlflow deployment",
    endpoint_name=batch_endpoint_name,
    model=registered_model,
    code_path="./forecasting_script",
    scoring_script="forecasting_script.py",
    environment=env,
    environment_variables={
        "TARGET_COLUMN_NAME": TARGET_COLUMN_NAME,
    },
    compute=cluster_name,
    instance_count=1,  # 2
    max_concurrency_per_instance=1,  # 2
    mini_batch_size=1,  # 10
    output_action=BatchDeploymentOutputAction.APPEND_ROW,
    output_file_name=output_file,
    retry_settings=BatchRetrySettings(max_retries=3, timeout=30),
    logging_level="info",
    properties={"include_output_header": "true"},
    tags={"include_output_header": "true"},
)

In [11]:
ml_client.begin_create_or_update(batch_deployment).wait()

In [None]:
batch_endpoint_name

### Data visualization of the train and test data

In [13]:
# We stored the training and test data during training
df_train = pd.read_parquet("./data/training-mltable-folder/df_train.parquet")
df_test = pd.read_parquet("./data/testing-mltable-folder/df_test.parquet")

In [None]:
df_train.tail(2)

In [None]:
df_test.head(2)

In [17]:
# The above test data follows the training data
# Store in folder for the batch endpoint to use as parquet file
df_test.to_parquet("./data/test_data_scenarios/df_test_scenario1.parquet")

In [None]:
import matplotlib.pyplot as plt

# Concatenate the training and testing DataFrames
df_plot = pd.concat([df_train, df_test])

# Create a figure and axis
plt.figure(figsize=(10, 6))
ax = plt.gca()  # Get current axis

# Group by both 'data_type' and 'time_series_id'
for (data_type, time_series_id), df in df_plot.groupby(
    ["data_type", TIME_SERIES_ID_COLUMN_NAME]
):
    df.plot(
        x="date",
        y=TARGET_COLUMN_NAME,
        label=f"{data_type} - {time_series_id}",
        ax=ax,
        legend=False,
    )

# Customize the plot
plt.xlabel("Date")
plt.ylabel("Value")
plt.title("Train and Test Data")

# Manually create the legend after plotting
plt.legend(title="Data Type and Time Series ID")
plt.show()

# Forecasting from the trained model

In this section we will review the forecast interface for two main scenarios: forecasting right after the training data, and the more complex interface for forecasting when there is a gap (in the time sense) between training and testing data.

## X_train is directly followed by the X_test
Let's first consider the case when the prediction period immediately follows the training data. This is typical in scenarios where we have the time to retrain the model every time we wish to forecast. Forecasts that are made on daily and slower cadence typically fall into this category. Retraining the model every time benefits the accuracy because the most recent data is often the most informative.


<img src="./images/forecast_function_at_train.png" alt="Description" width="50%">

We use X_test as a forecast request to generate the predictions.

### Get the test data for which we need the prediction

In [18]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes

my_test_data_input = Input(
    type=AssetTypes.URI_FOLDER,
    path="./data/test_data_scenarios",
)

In [None]:
my_test_data_input

### Invoke the endpoint with the test data

In [27]:
job = ml_client.batch_endpoints.invoke(
    endpoint_name=batch_endpoint_name,
    input=my_test_data_input,  # Test data input
    deployment_name="non-mlflow-deployment",  # name is required as default deployment is not set
)

In [None]:
job_name = job.name
batch_job = ml_client.jobs.get(name=job_name)
print(batch_job.status)
# stream the job logs
ml_client.jobs.stream(name=job_name)

In [None]:
job_name, " ", output_file

In [None]:
# Get the predictions
download_path = "./outputs/"
ml_client.jobs.download(job_name, download_path=download_path)

In [None]:
fcst_df = pd.read_csv(download_path + output_file, parse_dates=[TIME_COLUMN_NAME])
fcst_df.head()

# Forecasting away from training data
Suppose we trained a model, some time passed, and now we want to apply the model without re-training. If the model "looks back" -- uses previous values of the target -- then we somehow need to provide those values to the model.

<img src="./images/forecast_function_away_from_train.png" alt="Description" width="50%">

The notion of forecast origin comes into play: **the forecast origin is the last period for which we have seen the target value.** This applies per time-series, so each time-series can have a different forecast origin.

The part of data before the forecast origin is the **prediction context**. To provide the context values the model needs when it looks back, we pass definite values in y_test (aligned with corresponding times in X_test).

In [None]:
# Generate the same kind of test data we trained on, but now make the train set much longer, so that the test set will be in the future
from helper import get_timeseries, make_forecasting_query

X_context, y_context, X_away, y_away = get_timeseries(
    train_len=42,  # train data was 30 steps long
    test_len=4,
    time_column_name=TIME_COLUMN_NAME,
    target_column_name=TARGET_COLUMN_NAME,
    time_series_id_column_name=TIME_SERIES_ID_COLUMN_NAME,
    time_series_number=2,
)

print("End of the data we trained on:")
print(df_train.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].max())

print("\nStart of the data we want to predict on:")
print(X_away.groupby(TIME_SERIES_ID_COLUMN_NAME)[TIME_COLUMN_NAME].min())

There is a gap of 12 hours between end of training and beginning of X_away. (It looks like 13 because all timestamps point to the start of the one hour periods.) Using only X_away will fail without adding context data for the model to consume

In [40]:
x_gap_test = X_away.copy()
x_gap_test["y"] = y_away
x_gap_test["data_type"] = "test"  # Dummy data

x_gap_test.to_csv("./data/test_gap_scenario/gap_test_data.csv")

In [None]:
# Since the length of the lookback is 3, we need to add 3 periods from the context to the request so that the model has the data it needs

# Put the X and y back together for a while. They like each other and it makes them happy.
X_context[TARGET_COLUMN_NAME] = y_context
X_away[TARGET_COLUMN_NAME] = y_away
fulldata = pd.concat([X_context, X_away])

# Forecast origin is the last point of data, which is one 1-hr period before test
forecast_origin = X_away[TIME_COLUMN_NAME].min() - pd.DateOffset(hours=1)
# it is indeed the last point of the context
assert forecast_origin == X_context[TIME_COLUMN_NAME].max()
print("Forecast origin: " + str(forecast_origin))

# The model uses lags and rolling windows to look back in time
n_lookback_periods = max(
    lags
)  # n_lookback_periods = max(max(lags), forecast_horizon) # If target_rolling_window_size is used
lookback = pd.DateOffset(hours=n_lookback_periods)
horizon = pd.DateOffset(hours=forecast_horizon)

In [None]:
# Now make the forecast query from context. This is the main thing for predicting gap data
from helper import make_forecasting_query

X_pred, y_pred = make_forecasting_query(
    fulldata, TIME_COLUMN_NAME, TARGET_COLUMN_NAME, forecast_origin, horizon, lookback
)

# show the forecast request aligned
X_show = X_pred.copy()
X_show[TARGET_COLUMN_NAME] = y_pred
X_show[X_show["time_series_id"] == "ts0"]

In [51]:
X_pred[
    "data_type"
] = "unknown"  # Our trining had an additional column called data_type, hence, adding it

In [54]:
gap_data = X_pred.copy()
gap_data[TARGET_COLUMN_NAME] = y_pred
gap_data.to_csv("./data/test_gap_scenario/gap_data_with_context.csv")

In [None]:
y_pred

In [55]:
my_test_data_gap_input = Input(
    type=AssetTypes.URI_FOLDER,
    path="./data/test_gap_scenario/",  # Path to the data folder that has the test data with gap
)

In [61]:
gap_job = ml_client.batch_endpoints.invoke(
    endpoint_name=batch_endpoint_name,
    input=my_test_data_gap_input,  # Test data input
    deployment_name="non-mlflow-deployment",  # name is required as default deployment is not set
)

In [None]:
job_name = gap_job.name
batch_job = ml_client.jobs.get(name=job_name)
print(batch_job.status)
# stream the job logs
ml_client.jobs.stream(name=job_name)

In [None]:
# Get the predictions
gap_download_path = "./outputs/gap_scenario/"
ml_client.jobs.download(job_name, download_path=gap_download_path)

In [None]:
gap_fcst_df = pd.read_csv(
    gap_download_path + output_file, parse_dates=[TIME_COLUMN_NAME]
)
gap_fcst_df

In [None]:
gap_data

In [None]:
# show the forecast aligned without the generated features
X_show = gap_fcst_df[gap_fcst_df["data_type"] == "test"]
X_show