backend/time-series-forecasting/training_methods/automl_training_method.py (266 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from tokenize import Double from typing import Any, Dict, List, Optional import pandas as pd import numpy as np from google.cloud import aiplatform, bigquery from google.cloud.aiplatform import models import utils from models import dataset, forecast_job_request from training_methods import training_method import constants TIME_COLUMN_PARAMETER = "timeColumn" TARGET_COLUMN_PARAMETER = "targetColumn" TIME_SERIES_IDENTIFIER_COLUMN_PARAMETER = "timeSeriesIdentifierColumn" FORECAST_HORIZON_PARAMETER = "forecastHorizon" DATA_GRANULARITY_UNIT_PARAMETER = "dataGranularityUnit" DATA_GRANULARITY_COUNT_PARAMETER = "dataGranularityCount" OPTIMIZATION_OBJECTIVE_PARAMETER = "optimizationObjective" COLUMN_SPECS_PARAMETER = "columnSpecs" TIME_SERIES_ATTRIBUTE_COLUMNS_PARAMETER = "timeSeriesAttributeColumns" CONTEXT_WINDOW_PARAMETER = "contextWindow" class AutoMLForecastingTrainingMethod(training_method.TrainingMethod): def __init__(self) -> None: super().__init__() aiplatform.init() @property def id(self) -> str: """A unique id representing this training method. Returns: str: The id """ return "automl-forecasting" @property def display_name(self) -> str: """A display_name representing this training method. Returns: str: The name """ return "Vertex AI AutoML Forecasting" def dataset_time_series_identifier_column( self, job_request: forecast_job_request.ForecastJobRequest ) -> str: """The column representing the time series identifier variable in the dataset dataframe. Returns: str: The column name """ return job_request.model_parameters[TIME_SERIES_IDENTIFIER_COLUMN_PARAMETER] def dataset_time_column( self, job_request: forecast_job_request.ForecastJobRequest ) -> str: """The column representing the time variable in the dataset dataframe. Returns: str: The column name """ return job_request.model_parameters[TIME_COLUMN_PARAMETER] def dataset_target_column( self, job_request: forecast_job_request.ForecastJobRequest ) -> str: """The column representing the target variable in the dataset dataframe. Returns: str: The column name """ return job_request.model_parameters[TARGET_COLUMN_PARAMETER] def train( self, dataset: dataset.Dataset, model_parameters: Dict[str, Any], prediction_parameters: Dict[str, Any], ) -> str: """Train a job and return the model URI. Args: dataset (dataset.Dataset): Input dataset. model_parameters (Dict[str, Any]): The model training parameters. prediction_parameters (Dict[str, Any]): The prediction parameters. Returns: str: The model resource name """ time_column = model_parameters.get(TIME_COLUMN_PARAMETER) target_column = model_parameters.get(TARGET_COLUMN_PARAMETER) column_specs = model_parameters.get(COLUMN_SPECS_PARAMETER) time_series_id_column = model_parameters.get( TIME_SERIES_IDENTIFIER_COLUMN_PARAMETER ) data_granularity_unit = model_parameters.get(DATA_GRANULARITY_UNIT_PARAMETER) data_granularity_count = model_parameters.get(DATA_GRANULARITY_COUNT_PARAMETER) optimization_objective = model_parameters.get(OPTIMIZATION_OBJECTIVE_PARAMETER) time_series_attribute_columns = model_parameters.get( TIME_SERIES_ATTRIBUTE_COLUMNS_PARAMETER ) forecast_horizon = prediction_parameters.get(FORECAST_HORIZON_PARAMETER) if time_column is None: raise ValueError(f"Missing argument: {TIME_COLUMN_PARAMETER}") if target_column is None: raise ValueError(f"Missing argument: {TARGET_COLUMN_PARAMETER}") if time_series_id_column is None: raise ValueError( f"Missing argument: {TIME_SERIES_IDENTIFIER_COLUMN_PARAMETER}" ) if forecast_horizon is None: raise ValueError(f"Missing argument: {FORECAST_HORIZON_PARAMETER}") if data_granularity_unit is None: raise ValueError(f"Missing argument: {DATA_GRANULARITY_UNIT_PARAMETER}") if data_granularity_count is None: raise ValueError(f"Missing argument: {DATA_GRANULARITY_COUNT_PARAMETER}") # Start training model = self._train( dataset=dataset, time_column=time_column, target_column=target_column, time_series_id_column=time_series_id_column, forecast_horizon=forecast_horizon, data_granularity_unit=data_granularity_unit, data_granularity_count=data_granularity_count, optimization_objective=optimization_objective, column_specs=column_specs, time_series_attribute_columns=time_series_attribute_columns, ) return model.resource_name def evaluate(self, model: str) -> str: """Evaluate a model and return the BigQuery table ID to its evaluation table. Args: model (str): Model to evaluate. Returns: str: The BigQuery evaluation table ID. """ table_id = self._evaluate(model_name=model) return table_id def predict( self, dataset: dataset.Dataset, model: str, model_parameters: Dict[str, Any], prediction_parameters: Dict[str, Any], ) -> str: """Predict using a model and return the BigQuery table ID to its prediction table. Args: dataset (dataset.Dataset): Input dataset. model (str): Model to evaluate. model_parameters (Dict[str, Any]): The model training parameters. prediction_parameters (Dict[str, Any]): The prediction parameters. Returns: str: The BigQuery prediction table view ID. """ forecast_horizon = prediction_parameters.get(FORECAST_HORIZON_PARAMETER) context_window = prediction_parameters.get(CONTEXT_WINDOW_PARAMETER) time_column = model_parameters.get(TIME_COLUMN_PARAMETER) time_series_id_column = model_parameters.get( TIME_SERIES_IDENTIFIER_COLUMN_PARAMETER ) target_column_name = model_parameters.get(TARGET_COLUMN_PARAMETER) if forecast_horizon is None: raise ValueError(f"Missing argument: {FORECAST_HORIZON_PARAMETER}") if context_window is None: raise ValueError(f"Missing argument: {CONTEXT_WINDOW_PARAMETER}") if time_column is None: raise ValueError(f"Missing argument: {TIME_COLUMN_PARAMETER}") if time_series_id_column is None: raise ValueError(f"Missing argument: {TIME_SERIES_IDENTIFIER_COLUMN_PARAMETER}") if target_column_name is None: raise ValueError(f"Missing argument: {TARGET_COLUMN_PARAMETER}") # Get test data BigQuery source uri test_bq_source_id = dataset.get_bigquery_table_id( time_column=time_column, dataset_portion="test" ) processed_dataset_bq_source = self._prepare_test_dataset( context_window=context_window, forecast_horizon=forecast_horizon, model_parameters=model_parameters, bigquery_source=f"bq://{test_bq_source_id}", ) prediction_table_id = self._predict( model_name=model, bigquery_source=processed_dataset_bq_source ) # Get the prediction dataset id prediction_dataset_id = "" if prediction_table_id is None: raise ValueError("Prediction table id is null!") else: prediction_dataset_id = prediction_table_id.split(".")[1] # Create a view and rename column names in the prediction table client = bigquery.Client() view_id = f"{client.project}.{prediction_dataset_id}.{utils.generate_uuid()}" view = bigquery.Table(view_id) view.view_query = f""" SELECT {time_series_id_column} as {constants.FORECAST_TIME_SERIES_IDENTIFIER_COLUMN}, {time_column} as {constants.FORECAST_TIME_COLUMN}, predicted_{target_column_name} as {constants.FORECAST_TARGET_COLUMN} FROM `{prediction_table_id}`""" client.create_table(view) return view_id def _train( self, dataset: dataset.Dataset, time_column: str, target_column: str, time_series_id_column: str, forecast_horizon: int, data_granularity_unit: str, data_granularity_count: int, optimization_objective: Optional[str] = None, column_specs: Optional[Dict[str, str]] = None, time_series_attribute_columns: Optional[List[str]] = None, ) -> models.Model: uuid = utils.generate_uuid() # Get training data BigQuery source uri train_bq_source = dataset.get_bigquery_table_id( time_column=time_column, dataset_portion="train" ) train_bq_source = f"bq://{train_bq_source}" # Create Vertex AI time series dataset timeseries_dataset = aiplatform.TimeSeriesDataset.create( display_name=f"timeseries_{uuid}", bq_source=train_bq_source, ) # Create AutoML forecasting training job training_job = aiplatform.AutoMLForecastingTrainingJob( display_name=f"automl-job-{uuid}", optimization_objective=optimization_objective, column_specs=column_specs, ) # Start running the training pipeline model = training_job.run( dataset=timeseries_dataset, target_column=target_column, time_column=time_column, time_series_identifier_column=time_series_id_column, available_at_forecast_columns=[time_column], unavailable_at_forecast_columns=[target_column], time_series_attribute_columns=time_series_attribute_columns, forecast_horizon=forecast_horizon, data_granularity_unit=data_granularity_unit, data_granularity_count=data_granularity_count, model_display_name=f"automl-{uuid}", ) return model def _evaluate(self, model_name: str) -> str: # Get the model resource model = aiplatform.Model(model_name=model_name) # check if there us eval item if len(model.list_model_evaluations()) > 0: # Parse evaluation data model_evaluations = model.list_model_evaluations()[0].to_dict() evaluation_metrics = model_evaluations["metrics"] evaluation_metrics_df = pd.DataFrame( evaluation_metrics.items(), columns=["metric", "value"] ) # Construct a BigQuery client object. client = bigquery.Client() project_id = client.project dataset_id = utils.generate_uuid() # Create evaluation dataset in default region bq_dataset = bigquery.Dataset(f"{project_id}.{dataset_id}") bq_dataset = client.create_dataset(bq_dataset, exists_ok=True) # Create a bq table in the dataset and upload the evaluation metrics table_id = f"{project_id}.{dataset_id}.automl-evaluation" job_config = bigquery.LoadJobConfig( # The schema is used to assist in data type definitions. schema=[ bigquery.SchemaField("metric", bigquery.enums.SqlTypeNames.STRING), bigquery.SchemaField("value", bigquery.enums.SqlTypeNames.FLOAT64), ], # Optionally, set the write disposition. BigQuery appends loaded rows # to an existing table by default, but with WRITE_TRUNCATE write # disposition it replaces the table with the loaded data. write_disposition="WRITE_TRUNCATE", ) job = client.load_table_from_dataframe( dataframe=evaluation_metrics_df, destination=table_id, job_config=job_config, ) # Wait for the job to complete. _ = job.result() return str(job.destination) else: raise ValueError( f"Model evaluation data does not exist for model {model_name}!" ) def _predict(self, model_name: str, bigquery_source: str) -> str: """This function runs the batch prediction job Args: model_name (str): The model used for batch prediciton. bigquery_source (str): The BigQuery source URI for batch prediction. Returns: str: The table id of batch prediction results. """ client = bigquery.Client() project_id = client.project model = aiplatform.Model(model_name=model_name) job = model.batch_predict( job_display_name=f"automl_forecasting_{utils.generate_uuid()}", bigquery_source=bigquery_source, instances_format="bigquery", bigquery_destination_prefix=f"bq://{project_id}", predictions_format="bigquery", generate_explanation=True, sync=True, ) output_dataset = job.output_info.bigquery_output_dataset output_dataset = output_dataset.replace("bq://", "") output_table = job.output_info.bigquery_output_table bq_output_table_id = f"{output_dataset}.{output_table}" return bq_output_table_id def _prepare_test_dataset( self, context_window: int, forecast_horizon: int, model_parameters: Dict[str, Any], bigquery_source: str, ) -> str: """This function does the prerocessing job on the test data and saves the result in a BigQuery table Args: context_window (int): Sets how far back the model looks during training (and for forecasts). forecast_horizon (int): Determines how far into the future the model forecasts the target value for each row of prediction data. model_parameters (Dict[str, Any]): The model training parameters. bigquery_source (str): BigQuery uri of the table to be processed in format bq://project-id.dataset-id.table-id Returns: str: BigQuery uri of the destination table where the preprocess data is saved to in format bq://project-id.dataset-id.table-id """ time_column = model_parameters.get(TIME_COLUMN_PARAMETER) target_column = model_parameters.get(TARGET_COLUMN_PARAMETER) if time_column is None: raise ValueError(f"Missing argument: {TIME_COLUMN_PARAMETER}") if target_column is None: raise ValueError(f"Missing argument: {TARGET_COLUMN_PARAMETER}") # Download dataset from BigQuery df_test = utils.download_bq_table(bq_table_uri=bigquery_source) df_test.sort_values(by=time_column, inplace=True) df_test[time_column] = pd.to_datetime(df_test[time_column]).dt.normalize() start_date = df_test.iloc[0][time_column] # Store start and end dates for context and horizon date_context_window_start = start_date date_context_window_end = start_date + np.timedelta64(context_window, "D") time_horizon_end = date_context_window_end + np.timedelta64( forecast_horizon, "D" ) # Extract dataframes for context and horizon df_test_context = df_test[ (df_test[time_column] >= date_context_window_start) & (df_test[time_column] < date_context_window_end) ] df_test_horizon = df_test[ (df_test[time_column] >= date_context_window_end) & (df_test[time_column] < time_horizon_end) ].copy() # Remove target for horizon (i.e. future dates) df_test_horizon[target_column] = None # Write test data to CSV df_test = pd.concat([df_test_context, df_test_horizon]) # Load the data to BigQuery destination_table = utils.save_dataframe_to_bigquery( dataframe=df_test, table_name="processed-test-data" ) bq_uri = f"bq://{destination_table}" return bq_uri