utils/bq_setup.py (133 lines of code) (raw):
"""
Bigquery setup related functions
"""
import os
import argparse
from os import listdir
from google.cloud import bigquery
from google.api_core.exceptions import BadRequest
# disabling for linting to pass
# pylint: disable = broad-exception-raised
DATABASE_PREFIX = os.getenv("DATABASE_PREFIX", "")
GCP_PROJECT = os.getenv("PROJECT_ID", None)
BQ_REGION = os.getenv("BQ_REGION", "US")
BQ_DATASET = DATABASE_PREFIX + os.getenv("BQ_DATASET", "lms_analytics")
bq_client = bigquery.Client(location=BQ_REGION)
# Print statements are being used in this file to debug in the Github actions
def create_bigquery_dataset(dataset):
"""Create a dataset in the bigquery"""
dataset_id = f"{GCP_PROJECT}.{dataset}"
print("Dataset_id", dataset_id)
dataset = bigquery.Dataset(dataset_id)
try:
bq_client.create_dataset(dataset, timeout=30)
print(f"{dataset} dataset created")
except Exception as e: # pylint: disable = broad-except
print("Error raised", e)
def create_table_using_sql_file(dataset, file_name):
"""Create a table using the query from the given file in the BQ dataset"""
dataset_id = f"{GCP_PROJECT}.{dataset}"
print("Dataset_id", dataset_id)
old_dataset = os.getenv("BQ_DATASET", "lms_analytics")
job_config = bigquery.QueryJobConfig(default_dataset=dataset_id)
with open(file_name, "r", encoding="utf-8") as file:
query = file.read()
query = query.replace(old_dataset, dataset)
query_job = bq_client.query(query,
project=GCP_PROJECT,
job_config=job_config)
error = ""
try:
query_job.result()
print("Completed for the file: ", file_name)
except BadRequest as e:
for e in query_job.errors:
error = f"ERROR: {e['message']}"
raise Exception(error) from e
def get_sql_files_list(bq_dataset_name, folder_name):
"""Returns the set of files in the folder named by the given dataset name"""
file_path = os.path.join(folder_name,bq_dataset_name)
if not os.path.exists(file_path):
return []
sql_files = listdir(file_path)
return [file_path + "/" + i for i in sql_files]
def create_tables(dataset):
"""Create tables in the bigquery"""
print("dataset", dataset)
sql_file_list = get_sql_files_list(os.getenv("BQ_DATASET"),"tables_sql")
if sql_file_list:
for each_file in sql_file_list:
print(f"Running ddl_file: {each_file}")
create_table_using_sql_file(dataset, each_file)
else:
print("No files exist at the given path")
def create_views(dataset):
"""Create tables in the bigquery """
print("dataset", dataset)
sql_file_list = get_sql_files_list(os.getenv("BQ_DATASET"),"views_sql")
if sql_file_list:
sql_file_list.sort(key=lambda x: x[-7:])
for each_file in sql_file_list:
print(f"Running ddl_file: {each_file}")
create_table_using_sql_file(dataset, each_file)
else:
print("No files exist at the given path")
def alter_table_using_sql_file(dataset, file_name):
"""Alter a table using the query from the given file in the BQ dataset"""
dataset_id = f"{GCP_PROJECT}.{dataset}"
print("Dataset_id", dataset_id)
old_dataset = os.getenv("BQ_DATASET", "lms_analytics")
job_config = bigquery.QueryJobConfig(default_dataset=dataset_id)
with open(file_name, "r", encoding="utf-8") as file:
query = file.read()
query = query.replace(old_dataset, dataset)
query_job = bq_client.query(query,
project=GCP_PROJECT,
job_config=job_config)
error = ""
try:
query_job.result()
print("Completed for the file: ", file_name)
except BadRequest as e:
for e in query_job.errors:
error = f"ERROR: {e['message']}"
raise Exception(error) from e
def alter_tables(dataset):
"""Alter tables in the bigquery"""
print("dataset", dataset)
sql_file_list = get_sql_files_list(os.getenv("BQ_DATASET"),"alter_tables_sql")
if sql_file_list:
for each_file in sql_file_list:
print(f"Running ddl_file: {each_file}")
alter_table_using_sql_file(dataset, each_file)
else:
print("No files exist at the given path")
def parse_arguments():
"""Parse the given arguments"""
parser = argparse.ArgumentParser()
parser.add_argument("--create-dataset",
dest="create_dataset",
type=str,
default="false",
choices=["true", "false"],
help="Create dataset for lms_analytics? true or false")
parser.add_argument(
"--create-tables",
dest="create_tables",
type=str,
default="true",
choices=["true", "false"],
help="Create table for logs in lms_analytics dataset? true or false")
parser.add_argument(
"--alter-tables",
dest="alter_tables",
type=str,
default="false",
choices=["true", "false"],
help="Alter tables in lms_analytics dataset? true or false")
parser.add_argument(
"--create-views",
dest="create_views",
type=str,
default="true",
choices=["true", "false"],
help="Create Views for logs in lms_analytics dataset? true or false")
return parser.parse_args()
if __name__ == "__main__":
# To run locally, appropriate BQ permissions should be there and
# be logged in to the respective GCP project using google sdk
# Set the following environment variables
# BQ_DATASET -<dataset-name> (default -> lms_analytics),
# GCP_PROJECT -<project_id>
# Run the following command with the required arguments to trigger the script
# cd utils
# PYTHONPATH=../common/src python3 bq_setup.py
# For altering the tables pass these as args:
# --create-tables=false --create-views=false --alter-tables=true
args = parse_arguments()
if not GCP_PROJECT:
raise Exception("Please set 'GCP_PROJECT' environment variable")
if args.create_dataset == "true":
create_bigquery_dataset(BQ_DATASET)
if args.create_tables == "true":
create_tables(BQ_DATASET)
if args.create_views == "true":
create_views(BQ_DATASET)
if args.alter_tables == "true":
alter_tables(BQ_DATASET)