backend/time-series-forecasting/utils.py (34 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import string
import pandas as pd
from google.cloud import bigquery
random.seed(1236)
# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
def download_bq_table(bq_table_uri: str) -> pd.DataFrame:
"""This function downloads a BigQuery table and returns a dataframe
Args:
bq_table_uri (str): BigQuery uri of the table
Returns:
pd.DataFrame: table data in dataframe format
"""
# Remove bq:// prefix if present
prefix = "bq://"
if bq_table_uri.startswith(prefix):
bq_table_uri = bq_table_uri[len(prefix) :]
client = bigquery.Client()
table = client.get_table(bq_table_uri)
rows = client.list_rows(table)
return rows.to_dataframe()
def save_dataframe_to_bigquery(dataframe: pd.DataFrame, table_name: str) -> str:
"""This function loads a dataframe to a new bigquery table
Args:
dataframe (pd.Dataframe): dataframe to be loaded to bigquery
table_name (str): name of the bigquery table that is being created
Returns:
str: table id of the destination bigquery table
"""
client = bigquery.Client()
project_id = client.project
dataset_id = generate_uuid()
bq_dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
bq_dataset = client.create_dataset(bq_dataset, exists_ok=True)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[
bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE),
],
# Optionally, set the write disposition. BigQuery appends loaded rows
# to an existing table by default, but with WRITE_TRUNCATE write
# disposition it replaces the table with the loaded data.
write_disposition="WRITE_TRUNCATE",
)
# Reference: https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-dataframe
job = client.load_table_from_dataframe(
dataframe=dataframe,
destination=f"{project_id}.{dataset_id}.{table_name}",
job_config=job_config,
)
job.result()
return str(job.destination)