env_setup.py (243 lines of code) (raw):
import asyncio
from google.cloud import bigquery
import google.api_core
from embeddings import retrieve_embeddings, store_schema_embeddings, setup_kgq_table, load_kgq_df, store_kgq_embeddings
from utilities import ( PG_REGION, PG_INSTANCE, PG_DATABASE, PG_USER, PG_PASSWORD,
BQ_REGION,
EXAMPLES, LOGGING, VECTOR_STORE, PROJECT_ID,
BQ_OPENDATAQNA_DATASET_NAME,FIRESTORE_REGION)
import subprocess
import time
if VECTOR_STORE == 'bigquery-vector':
DATASET_REGION = BQ_REGION
elif VECTOR_STORE == 'cloudsql-pgvector':
DATASET_REGION = PG_REGION
def setup_postgresql(pg_instance, pg_region, pg_database, pg_user, pg_password):
"""Sets up a PostgreSQL Cloud SQL instance with a database and user.
Args:
pg_instance (str): Name of the Cloud SQL instance.
pg_region (str): Region where the instance should be located.
pg_database (str): Name of the database to create.
pg_user (str): Name of the user to create.
pg_password (str): Password for the user.
"""
# Check if Cloud SQL instance exists
describe_cmd = ["gcloud", "sql", "instances", "describe", pg_instance, "--format=value(databaseVersion)"]
describe_process = subprocess.run(describe_cmd, capture_output=True, text=True)
if describe_process.returncode == 0:
if describe_process.stdout.startswith("POSTGRES"):
print("Found existing Postgres Cloud SQL Instance!")
else:
raise RuntimeError("Existing Cloud SQL instance is not PostgreSQL")
else:
print("Creating new Cloud SQL instance...")
create_cmd = [
"gcloud", "sql", "instances", "create", pg_instance,
"--database-version=POSTGRES_15", "--region", pg_region,
"--cpu=1", "--memory=4GB", "--root-password", pg_password,
"--database-flags=cloudsql.iam_authentication=On"
]
subprocess.run(create_cmd, check=True) # Raises an exception if creation fails
# Wait for instance to be ready
print("Waiting for instance to be ready...")
time.sleep(9999) # You might need to adjust this depending on how long it takes
# Create the database
list_cmd = ["gcloud", "sql", "databases", "list", "--instance", pg_instance]
list_process = subprocess.run(list_cmd, capture_output=True, text=True)
if pg_database in list_process.stdout:
print("Found existing Postgres Cloud SQL database!")
else:
print("Creating new Cloud SQL database...")
create_db_cmd = ["gcloud", "sql", "databases", "create", pg_database, "--instance", pg_instance]
subprocess.run(create_db_cmd, check=True)
# Create the user
create_user_cmd = [
"gcloud", "sql", "users", "create", pg_user,
"--instance", pg_instance, "--password", pg_password
]
subprocess.run(create_user_cmd, check=True)
print(f"PG Database {pg_database} in instance {pg_instance} is ready.")
def create_vector_store():
"""
Initializes the environment and sets up the vector store for Open Data QnA.
This function performs the following steps:
1. Loads configurations from the "config.ini" file.
2. Determines the data source (BigQuery or CloudSQL PostgreSQL) and sets the dataset region accordingly.
3. If the vector store is "cloudsql-pgvector" and the data source is not CloudSQL PostgreSQL, it creates a new PostgreSQL dataset for the vector store.
4. If logging is enabled or the vector store is "bigquery-vector", it creates a BigQuery dataset for the vector store and logging table.
5. It creates a Vertex AI connection for the specified model and embeds the table schemas and columns into the vector database.
6. If embeddings are stored in BigQuery, creates a table column_details_embeddings in the BigQuery Dataset.
7. It generates the embeddings for the table schemas and column descriptions, and then inserts those embeddings into the BigQuery table.
Configuration:
- Requires the following environment variables to be set in "config.ini":
- `DATA_SOURCE`: The data source (e.g., "bigquery" or "cloudsql-pg").
- `VECTOR_STORE`: The type of vector store (e.g., "bigquery-vector" or "cloudsql-pgvector").
- `BQ_REGION`: The BigQuery region.
- `PROJECT_ID`: The Google Cloud project ID.
- `BQ_OPENDATAQNA_DATASET_NAME`: The name of the BigQuery dataset for Open Data QnA.
- `LOGGING`: Whether logging is enabled.
- If `VECTOR_STORE` is "cloudsql-pgvector" and `DATA_SOURCE` is not "cloudsql-pg":
- Requires additional environment variables for PostgreSQL instance setup.
Returns:
None
Raises:
RuntimeError: If there are errors during the setup process (e.g., dataset creation failure).
"""
print("Initializing environment setup.")
print("Loading configurations from config.ini file.")
print("Vector Store source set to: ", VECTOR_STORE)
# Create PostgreSQL Instance is data source is different from PostgreSQL Instance
if VECTOR_STORE == 'cloudsql-pgvector' :
print("Generating pg dataset for vector store.")
# Parameters for PostgreSQL Instance
pg_region = DATASET_REGION
pg_instance = "pg15-opendataqna"
pg_database = "opendataqna-db"
pg_user = "pguser"
pg_password = "pg123"
pg_schema = 'pg-vector-store'
setup_postgresql(pg_instance, pg_region, pg_database, pg_user, pg_password)
# Create a new data set on Bigquery to use for the logs table
if LOGGING or VECTOR_STORE == 'bigquery-vector':
if LOGGING:
print("Logging is enabled")
if VECTOR_STORE == 'bigquery-vector':
print("Vector store set to 'bigquery-vector'")
print(f"Generating Big Query dataset {BQ_OPENDATAQNA_DATASET_NAME}")
client=bigquery.Client(project=PROJECT_ID)
dataset_ref = f"{PROJECT_ID}.{BQ_OPENDATAQNA_DATASET_NAME}"
# Create the dataset if it does not exist already
try:
client.get_dataset(dataset_ref)
print("Destination Dataset exists")
except google.api_core.exceptions.NotFound:
print("Cannot find the dataset hence creating.......")
dataset=bigquery.Dataset(dataset_ref)
dataset.location=DATASET_REGION
client.create_dataset(dataset)
print(str(dataset_ref)+" is created")
def get_embeddings():
"""Generates and returns embeddings for table schemas and column descriptions.
This function performs the following steps:
1. Retrieves table schema and column description data based on the specified data source (BigQuery or PostgreSQL).
2. Generates embeddings for the retrieved data using the configured embedding model.
3. Returns the generated embeddings for both tables and columns.
Returns:
Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing two pandas DataFrames:
- table_schema_embeddings: Embeddings for the table schemas.
- col_schema_embeddings: Embeddings for the column descriptions.
Configuration:
This function relies on the following configuration variables:
- DATA_SOURCE: The source database ("bigquery" or "cloudsql-pg").
- BQ_DATASET_NAME (if DATA_SOURCE is "bigquery"): The BigQuery dataset name.
- BQ_TABLE_LIST (if DATA_SOURCE is "bigquery"): The list of BigQuery tables to process.
- PG_SCHEMA (if DATA_SOURCE is "cloudsql-pg"): The PostgreSQL schema name.
"""
print("Generating embeddings from source db schemas")
import pandas as pd
import os
current_dir = os.getcwd()
root_dir = os.path.expanduser('~') # Start at the user's home directory
while current_dir != root_dir:
for dirpath, dirnames, filenames in os.walk(current_dir):
config_path = os.path.join(dirpath, 'data_source_list.csv')
if os.path.exists(config_path):
file_path = config_path # Update root_dir to the found directory
break # Stop outer loop once found
current_dir = os.path.dirname(current_dir)
print("Source Found at Path :: "+file_path)
# Load the file
df_src = pd.read_csv(file_path)
df_src = df_src.loc[:, ["source", "user_grouping", "schema","table"]]
df_src = df_src.sort_values(by=["source","user_grouping","schema","table"])
#If no schema Error Out
if df_src['schema'].astype(str).str.len().min()==0 or df_src['schema'].isna().any():
raise ValueError("Schema column cannot be empty")
#Group by for all the tables filtered
df=df_src.groupby(['source','schema'])['table'].agg(lambda x: list(x.dropna().unique())).reset_index()
df['table']=df['table'].apply(lambda x: None if pd.isna(x).any() else x)
print("The Embeddings are extracted for the below combinations")
print(df)
table_schema_embeddings=pd.DataFrame(columns=['source_type','join_by','table_schema', 'table_name', 'content','embedding'])
col_schema_embeddings=pd.DataFrame(columns=['source_type','join_by','table_schema', 'table_name', 'column_name', 'content','embedding'])
for _, row in df.iterrows():
DATA_SOURCE = row['source']
SCHEMA = row['schema']
TABLE_LIST = row['table']
_t, _c = retrieve_embeddings(DATA_SOURCE, SCHEMA=SCHEMA, table_names=TABLE_LIST)
_t["source_type"]=DATA_SOURCE
_c["source_type"]=DATA_SOURCE
if not TABLE_LIST:
_t["join_by"]=DATA_SOURCE+"_"+SCHEMA+"_"+SCHEMA
_c["join_by"]=DATA_SOURCE+"_"+SCHEMA+"_"+SCHEMA
table_schema_embeddings = pd.concat([table_schema_embeddings,_t],ignore_index=True)
col_schema_embeddings = pd.concat([col_schema_embeddings,_c],ignore_index=True)
df_src['join_by'] = df_src.apply(
lambda row: f"{row['source']}_{row['schema']}_{row['schema']}" if pd.isna(row['table']) else f"{row['source']}_{row['schema']}_{row['table']}",axis=1)
table_schema_embeddings['join_by'] = table_schema_embeddings['join_by'].fillna(table_schema_embeddings['source_type'] + "_" + table_schema_embeddings['table_schema'] + "_" + table_schema_embeddings['table_name'])
col_schema_embeddings['join_by'] = col_schema_embeddings['join_by'].fillna(col_schema_embeddings['source_type'] + "_" + col_schema_embeddings['table_schema'] + "_" + col_schema_embeddings['table_name'])
table_schema_embeddings = table_schema_embeddings.merge(df_src[['join_by', 'user_grouping']], on='join_by', how='left')
table_schema_embeddings.drop(columns=["join_by"],inplace=True)
#Replace NaN values in group to default to the schema
table_schema_embeddings['user_grouping'] = table_schema_embeddings['user_grouping'].fillna(table_schema_embeddings['table_schema']+"-"+table_schema_embeddings['source_type'])
col_schema_embeddings = col_schema_embeddings.merge(df_src[['join_by', 'user_grouping']], on='join_by', how='left')
col_schema_embeddings.drop(columns=["join_by"],inplace=True)
#Replace NaN values in group to default to the schema
col_schema_embeddings['user_grouping'] = col_schema_embeddings['user_grouping'].fillna(col_schema_embeddings['table_schema']+"-"+col_schema_embeddings['source_type'])
print("Table and Column embeddings are created")
return table_schema_embeddings, col_schema_embeddings
async def store_embeddings(table_schema_embeddings, col_schema_embeddings):
"""
Stores table and column embeddings into the specified vector store.
This asynchronous function saves precomputed embeddings for table schemas and column descriptions
into either BigQuery or PostgreSQL (with pgvector extension) based on the VECTOR_STORE configuration.
Args:
table_schema_embeddings (pd.DataFrame): Embeddings for the table schemas.
col_schema_embeddings (pd.DataFrame): Embeddings for the column descriptions.
Configuration:
This function relies on the following configuration variables:
- VECTOR_STORE: Determines the target vector store ("bigquery-vector" or "cloudsql-pgvector").
- PROJECT_ID, BQ_REGION, BQ_OPENDATAQNA_DATASET_NAME (if VECTOR_STORE is "bigquery-vector"):
Configuration for BigQuery storage.
- PG_INSTANCE, PG_DATABASE, PG_USER, PG_PASSWORD, PG_REGION (if VECTOR_STORE is "cloudsql-pgvector"):
Configuration for PostgreSQL storage.
Returns:
None
"""
print("Storing embeddings back to the vector store.")
if VECTOR_STORE=='bigquery-vector':
await(store_schema_embeddings(table_details_embeddings=table_schema_embeddings,
tablecolumn_details_embeddings=col_schema_embeddings,
project_id=PROJECT_ID,
instance_name=None,
database_name=None,
schema=BQ_OPENDATAQNA_DATASET_NAME,
database_user=None,
database_password=None,
region=BQ_REGION,
VECTOR_STORE = VECTOR_STORE
))
elif VECTOR_STORE=='cloudsql-pgvector':
await(store_schema_embeddings(table_details_embeddings=table_schema_embeddings,
tablecolumn_details_embeddings=col_schema_embeddings,
project_id=PROJECT_ID,
instance_name=PG_INSTANCE,
database_name=PG_DATABASE,
schema=None,
database_user=PG_USER,
database_password=PG_PASSWORD,
region=PG_REGION,
VECTOR_STORE = VECTOR_STORE
))
print("Table and Column embeddings are saved to vector store")
async def create_kgq_sql_table():
"""
Creates a table for storing Known Good Query (KGQ) embeddings in the vector store.
This asynchronous function conditionally sets up a table to store known good SQL queries and their embeddings,
which are used to provide examples to the LLM during query generation. The table is created only
if the `EXAMPLES` configuration variable is set to 'yes'. If not, it prints a warning message encouraging
the user to create a query cache for better results.
Configuration:
This function relies on the following configuration variables:
- EXAMPLES: Determines whether to create the KGQ table ('yes' to create).
- VECTOR_STORE: Specifies the target vector store ("bigquery-vector" or "cloudsql-pgvector").
- PROJECT_ID, BQ_REGION, BQ_OPENDATAQNA_DATASET_NAME (if VECTOR_STORE is "bigquery-vector"):
Configuration for BigQuery storage.
- PG_INSTANCE, PG_DATABASE, PG_USER, PG_PASSWORD, PG_REGION (if VECTOR_STORE is "cloudsql-pgvector"):
Configuration for PostgreSQL storage.
Returns:
None
"""
if EXAMPLES:
print("Creating kgq table in vector store.")
# Delete any old tables and create a new table to KGQ embeddings
if VECTOR_STORE=='bigquery-vector':
await(setup_kgq_table(project_id=PROJECT_ID,
instance_name=None,
database_name=None,
schema=BQ_OPENDATAQNA_DATASET_NAME,
database_user=None,
database_password=None,
region=BQ_REGION,
VECTOR_STORE = VECTOR_STORE
))
elif VECTOR_STORE=='cloudsql-pgvector':
await(setup_kgq_table(project_id=PROJECT_ID,
instance_name=PG_INSTANCE,
database_name=PG_DATABASE,
schema=None,
database_user=PG_USER,
database_password=PG_PASSWORD,
region=PG_REGION,
VECTOR_STORE = VECTOR_STORE
))
else:
print("⚠️ WARNING: No Known Good Queries are provided to create query cache for Few shot examples!")
print("Creating a query cache is highly recommended for best outcomes")
print("If no Known Good Queries for the dataset are availabe at this time, you can use 3_LoadKnownGoodSQL.ipynb to load them later!!")
async def store_kgq_sql_embeddings():
"""
Stores known good query (KGQ) embeddings into the specified vector store.
This asynchronous function reads known good SQL queries from the "known_good_sql.csv" file
and stores their embeddings in either BigQuery or PostgreSQL (with pgvector) depending on the
`VECTOR_STORE` configuration. This process is only performed if the `EXAMPLES` configuration
variable is set to 'yes'. Otherwise, a warning message is displayed, highlighting the
importance of creating a query cache.
Configuration:
- Requires the "known_good_sql.csv" file to be present in the project directory.
- Relies on the following configuration variables:
- `EXAMPLES`: Determines whether to store KGQ embeddings ('yes' to store).
- `VECTOR_STORE`: Specifies the target vector store ("bigquery-vector" or "cloudsql-pgvector").
- `PROJECT_ID`, `BQ_REGION`, `BQ_OPENDATAQNA_DATASET_NAME` (if VECTOR_STORE is "bigquery-vector"):
Configuration for BigQuery storage.
- `PG_INSTANCE`, `PG_DATABASE`, `PG_USER`, `PG_PASSWORD`, `PG_REGION` (if VECTOR_STORE is "cloudsql-pgvector"):
Configuration for PostgreSQL storage.
Returns:
None
"""
if EXAMPLES:
print("Reading contents of known_good_sql.csv")
# Load the contents of the known_good_sql.csv file into a dataframe
df_kgq = load_kgq_df()
print("Storing kgq embeddings in vector store table.")
# Add KGQ to the vector store
if VECTOR_STORE=='bigquery-vector':
await(store_kgq_embeddings(df_kgq,
project_id=PROJECT_ID,
instance_name=None,
database_name=None,
schema=BQ_OPENDATAQNA_DATASET_NAME,
database_user=None,
database_password=None,
region=BQ_REGION,
VECTOR_STORE = VECTOR_STORE
))
elif VECTOR_STORE=='cloudsql-pgvector':
await(store_kgq_embeddings(df_kgq,
project_id=PROJECT_ID,
instance_name=PG_INSTANCE,
database_name=PG_DATABASE,
schema=None,
database_user=PG_USER,
database_password=PG_PASSWORD,
region=PG_REGION,
VECTOR_STORE = VECTOR_STORE
))
print('kgq embeddings stored.')
else:
print("⚠️ WARNING: No Known Good Queries are provided to create query cache for Few shot examples!")
print("Creating a query cache is highly recommended for best outcomes")
print("If no Known Good Queries for the dataset are availabe at this time, you can use 3_LoadKnownGoodSQL.ipynb to load them later!!")
def create_firestore_db(firestore_region=FIRESTORE_REGION,firestore_database="opendataqna-session-logs"):
# Check if Firestore database exists
database_exists_cmd = [
"gcloud", "firestore", "databases", "list",
"--filter", f"name=projects/{PROJECT_ID}/databases/{firestore_database}",
"--format", "value(name)" # Extract just the name if found
]
database_exists_process = subprocess.run(
database_exists_cmd, capture_output=True, text=True
)
if database_exists_process.returncode == 0 and database_exists_process.stdout:
if database_exists_process.stdout.startswith(f"projects/{PROJECT_ID}/databases/{firestore_database}"):
print("Found existing Firestore database with this name already!")
else:
raise RuntimeError("Issue with checking if the firestore db exists or not")
else:
# Create Firestore database
print("Creating new Firestore database...")
create_db_cmd = [
"gcloud", "firestore", "databases", "create",
"--database", firestore_database,
"--location", firestore_region,
"--project", PROJECT_ID
]
subprocess.run(create_db_cmd, check=True) # Raise exception on failure
# Potential wait for database readiness (optional)
time.sleep(30) # May not be strictly necessary for basic use
if __name__ == '__main__':
# Setup vector store for embeddings
create_vector_store()
# Generate embeddings for tables and columns
table_schema_embeddings, col_schema_embeddings = get_embeddings()
# Store table/column embeddings (asynchronous)
asyncio.run(store_embeddings(table_schema_embeddings, col_schema_embeddings))
# Create table for known good queries (if enabled)
asyncio.run(create_kgq_sql_table())
# Store known good query embeddings (if enabled)
asyncio.run(store_kgq_sql_embeddings())
create_firestore_db()