backend/time-series-forecasting/models/dataset.py (131 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import abc import dataclasses from datetime import datetime from functools import cached_property, cache from io import StringIO from typing import Any, Dict, List, Optional, Union import pandas as pd from google.cloud import bigquery import utils class Dataset(abc.ABC): id: str description: str display_name: str time_column: str icon: Optional[str] recommended_model_parameters: Optional[Dict[str, Dict[str, Any]]] recommended_prediction_parameters: Optional[Dict[str, Dict[str, Any]]] train_percentage: int = 0.8 @property @abc.abstractmethod def df(self) -> pd.DataFrame: """The full dataset dataframe, which can be quite large. Returns: pd.DataFrame: The dataset represented as a Pandas dataframe. """ pass @cached_property def columns(self) -> List[str]: return self.df.columns.tolist() @cached_property def df_preview(self) -> pd.DataFrame: return self.df.head() @cached_property def start_date(self) -> datetime: df = self.df time_values = pd.to_datetime(df[self.time_column]) return time_values.min() @cached_property def end_date(self) -> datetime: df = self.df time_values = pd.to_datetime(df[self.time_column]) return time_values.max() @cached_property def date_cutoff(self) -> datetime: # The cut-off date for dataset train/test split df = self.df dates_unique = df[self.time_column].unique() date_cutoff = sorted(dates_unique)[ round(len(dates_unique) * self.train_percentage) ] return date_cutoff @cached_property def df_train(self) -> pd.DataFrame: df = self.df # Split dataset based on date cut-off df_train = df[df[self.time_column] <= self.date_cutoff] return df_train @cached_property def df_test(self) -> pd.DataFrame: df = self.df # Split dataset based on date cut-off df_test = df[df[self.time_column] > self.date_cutoff] return df_test def as_response(self) -> Dict: df_preview = self.df_preview.fillna("").sort_values(self.time_column) df_preview["id"] = df_preview.index return { "id": self.id, "displayName": self.display_name, "description": self.description, "icon": self.icon, "startDate": self.start_date.strftime("%m/%d/%Y"), "endDate": self.end_date.strftime("%m/%d/%Y"), "columns": self.columns, "dfPreview": df_preview.to_dict("records"), "recommendedModelParameters": self.recommended_model_parameters, "recommendedPredictionParameters": self.recommended_prediction_parameters, } # @cache def get_bigquery_table_id( self, time_column: str, dataset_portion: Optional[str] = None ) -> str: """This function saves the dataset on BigQuery and returns the BigQuery bigquery distenation table uri. Args: time_column (str): Dataset time column name dataset_portion (str): `test` or `train`. This will return the main dataset (before split) if the data portion is None. Returns: str: BigQuery destination table ID. """ dataset_id = utils.generate_uuid() table_id = utils.generate_uuid() # Write dataset to BigQuery table client = bigquery.Client() project_id = client.project bq_dataset = bigquery.Dataset(f"{project_id}.{dataset_id}") bq_dataset = client.create_dataset(bq_dataset, exists_ok=True) job_config = bigquery.LoadJobConfig( # Specify a (partial) schema. All columns are always written to the # table. The schema is used to assist in data type definitions. schema=[ bigquery.SchemaField(time_column, bigquery.enums.SqlTypeNames.DATE), ], # Optionally, set the write disposition. BigQuery appends loaded rows # to an existing table by default, but with WRITE_TRUNCATE write # disposition it replaces the table with the loaded data. write_disposition="WRITE_TRUNCATE", ) # Reference: https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-dataframe df = pd.DataFrame() if dataset_portion == "train": df = self.df_train elif dataset_portion == "test": df = self.df_test elif dataset_portion is None: df = self.df else: raise ValueError(f"Unknown dataset portion: {dataset_portion}") job = client.load_table_from_dataframe( dataframe=df, destination=f"{project_id}.{dataset_id}.{table_id}", job_config=job_config, ) # Make an API request. _ = job.result() # Wait for the job to complete. return str(job.destination) @dataclasses.dataclass class CSVDataset(Dataset): filepath_or_buffer: Union[str, StringIO] display_name: str time_column: str description: str icon: Optional[str] = None recommended_model_parameters: Optional[Dict[str, Dict[str, Any]]] = None recommended_prediction_parameters: Optional[Dict[str, Dict[str, Any]]] = None id: str = dataclasses.field(default_factory=utils.generate_uuid) @cached_property def df(self) -> pd.DataFrame: df = pd.read_csv(self.filepath_or_buffer) df[self.time_column] = pd.to_datetime(df[self.time_column], utc=True) return df.sort_values(self.time_column) @dataclasses.dataclass class VertexAIDataset(Dataset): id: str display_name: str time_column: str description: str project: str region: str icon: Optional[str] = None recommended_model_parameters: Optional[Dict[str, Dict[str, Any]]] = None recommended_prediction_parameters: Optional[Dict[str, Dict[str, Any]]] = None @cached_property def df(self) -> pd.DataFrame: # TODO: Pull dataframe from Vertex AI Dataset return pd.DataFrame()