notebooks/0_CopyDataToCloudSqlPG.ipynb (573 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<div style=\"display: flex; align-items: left;\">\n",
" <a href=\"https://sites.google.com/corp/google.com/genai-solutions/home?authuser=0\">\n",
" <img src=\"../utilities/imgs/aaie.png\" style=\"margin-right\">\n",
" </a>\n",
"</div>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "copyright"
},
"outputs": [],
"source": [
"# Copyright 2024 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "DRyGcAepAPJ5"
},
"source": [
"\n",
"# **Open Data QnA: Set up Dataset on CloudSQL for PostgreSQL**\n",
"\n",
"---\n",
"\n",
"This notebook shows how to copy a BigQuery public dataset to CloudSQL for PostgreSQL\n",
"\n",
"\n",
"This is accomplished through the three following steps: \n",
"> i. Set up PostgreSQL instance and databae on Google Cloud SQL\n",
"\n",
"> ii. Copy the BigQuery table to Cloud Storage Bucket\n",
"\n",
"> iii. Create the table in PostgreSQL databae using csv file in Cloud Storage Bucket\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Change your Kernel to the created .venv with poetry from README.md**\n",
"\n",
"Below is the Kernel how it should look like before you proceed\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "p4W6FPnrYEE8"
},
"source": [
"## 🔗 **1. Connect Your Google Cloud Project**\n",
"Time to connect your Google Cloud Project to this notebook. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#@markdown Please fill in the value below with your GCP project ID and then run the cell.\n",
"PROJECT_ID = input(\"Enter the project id (same as your Setup Project) to copy source data in bigquery for this solution\")\n",
"\n",
"# Quick input validations.\n",
"assert PROJECT_ID, \"⚠️ Please provide your Google Cloud Project ID\"\n",
"\n",
"# Configure gcloud.\n",
"!gcloud config set project {PROJECT_ID}\n",
"print(f'Project has been set to {PROJECT_ID}')\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "yygMe6rPWxHS"
},
"source": [
"## 🔐 **2. Authenticate to Google Cloud**\n",
"Authenticate to Google Cloud as the IAM user logged into this notebook in order to access your Google Cloud Project.\n",
"\n",
"You can do this within Google Colab or using the Application Default Credentials in the Google Cloud CLI."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "PTXN1_DSXj2b"
},
"outputs": [],
"source": [
"\"\"\"Colab Auth\"\"\" \n",
"# from google.colab import auth\n",
"# auth.authenticate_user()\n",
"\n",
"\n",
"\"\"\"Jupiter Notebook Auth\"\"\"\n",
"import google.auth\n",
"import os\n",
"\n",
"credentials, project_id = google.auth.default()\n",
"\n",
"os.environ['GOOGLE_CLOUD_QUOTA_PROJECT']=PROJECT_ID\n",
"os.environ['GOOGLE_CLOUD_PROJECT']=PROJECT_ID"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Enable all the required APIs for the COPY\n",
"\n",
"!gcloud services enable \\\n",
" cloudapis.googleapis.com \\\n",
" compute.googleapis.com \\\n",
" iam.googleapis.com \\\n",
" sqladmin.googleapis.com \\\n",
" bigquery.googleapis.com --project {PROJECT_ID}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "noWgbDQQO7mr"
},
"source": [
"## ☁️ **3. Set up Cloud SQL PostgreSQL Instance** \n",
"A **Postgres** Cloud SQL instance is required for the following stages of this notebook.\n",
"\n",
"To connect and access our Postgres Cloud SQL database instance(s) we will leverage the [Cloud SQL Python Connector](https://github.com/GoogleCloudPlatform/cloud-sql-python-connector).\n",
"\n",
"The Cloud SQL Python Connector is a library that can be used alongside a database driver to allow users to easily connect to a Cloud SQL database without having to manually allowlist IP or manage SSL certificates. "
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ypjpse8yBRdI"
},
"source": [
"💽 **Create a Postgres Instance**\n",
"\n",
"Running the below cell will verify the existence of a Cloud SQL instance or create a new one if one does not exist.\n",
"\n",
"> ⏳ - Creating a Cloud SQL instance may take a few minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "_vIX7rNtVLhn"
},
"outputs": [],
"source": [
"#@markdown Please fill in the both the Google Cloud region and name of your Cloud SQL instance. Once filled in, run the cell.\n",
"\n",
"# Below are the recommended defaults; These can be changed to values of your choice\n",
"PG_REGION = \"us-central1\" #@param {type:\"string\"}\n",
"PG_INSTANCE = \"pg15-opendataqna\"\n",
"PG_DATABASE = \"opendataqna-db\"\n",
"PG_USER = \"pguser\"\n",
"PG_PASSWORD = \"pg123\"\n",
"\n",
"# Quick input validations.\n",
"assert PG_REGION, \"us-central1\"\n",
"assert PG_INSTANCE, \"pg15-opendataqna\"\n",
"\n",
"# check if Cloud SQL instance exists in the provided region and create it if it does not exist\n",
"database_version = !gcloud sql instances describe {PG_INSTANCE} --format=\"value(databaseVersion)\"\n",
"if database_version[0].startswith(\"POSTGRES\"):\n",
" print(\"Found existing Postgres Cloud SQL Instance!\")\n",
"else:\n",
" print(\"Creating new Cloud SQL instance...\")\n",
" !gcloud sql instances create {PG_INSTANCE} --database-version=POSTGRES_15 \\\n",
" --region={PG_REGION} --cpu=1 --memory=4GB --root-password={PG_PASSWORD} \\\n",
" --database-flags=cloudsql.iam_authentication=On\n",
"\n",
"# Create a database on the instance and a user with password\n",
"!gcloud sql databases create {PG_DATABASE} --instance={PG_INSTANCE}\n",
"!gcloud sql users create {PG_USER} \\\n",
"--instance={PG_INSTANCE} \\\n",
"--password={PG_PASSWORD}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "nzb0dFO6C4h6"
},
"source": [
"## ➡️ **4. Migrate a public BigQuery database to PostgreSQL instance**\n",
"Let's migrate a public BigQuery dataset over to the newly created PostgreSQL instance. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### A) Set up a Google Cloud Storage Bucket \n",
"This bucket will be used to store the exported BigQuery public dataset."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "0q5uFF0sJnWK"
},
"outputs": [],
"source": [
"from google.cloud import storage\n",
"from urllib.error import HTTPError\n",
"\n",
"# Choose a name for the bucket; You might have to choose a different name if the name below already exists\n",
"BUCKET_NAME = str(PROJECT_ID+'-opendataqna') #@param {type:\"string\"} \n",
"\n",
"# Creating a bucket\n",
"storage_client = storage.Client(project=PROJECT_ID)\n",
"\n",
"try: \n",
" bucket = storage_client.bucket(BUCKET_NAME)\n",
"\n",
" if bucket.exists(): \n",
" print(\"This bucket already exists.\")\n",
"\n",
" else:\n",
" bucket = storage_client.create_bucket(BUCKET_NAME)\n",
" print(f\"Bucket {bucket.name} created\")\n",
"\n",
"except:\n",
" print(\"⚠️ This bucket already exists in another project. Make sure to give your bucket a unique name.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### B) Export BigQuery Dataset to the Bucket\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#@markdown Please choose a BigQuery Public dataset to export. You can leave the default values. Once filled in, run the cell.\n",
"\n",
"# Below are the recommended defaults; You may choose a different database to export\n",
"BQ_PROJECT = \"bigquery-public-data\"\n",
"BQ_DATABASE = \"census_bureau_international\"\n",
"bq_tables = [] # Specify empty list to copy 'all' tables, or a Specific list, eg: [\"table1\", \"table3\", \"table10\"]\n",
"\n",
"\n",
"# Quick input validations.\n",
"assert BQ_PROJECT, \"⚠️ Please specify the BigQuery Project\"\n",
"assert BQ_DATABASE, \"⚠️ Please specify the BigQuery Database\"\n",
"\n",
"from google.cloud import bigquery\n",
"client = bigquery.Client(project=PROJECT_ID)\n",
"dataset_id = f'{BQ_PROJECT}.{BQ_DATABASE}'\n",
"\n",
"if not bq_tables:\n",
" bq_tables_obj = client.list_tables(dataset_id)\n",
" bq_tables = [table_obj.table_id for table_obj in bq_tables_obj]\n",
"\n",
"print(f'List of tables in {dataset_id}: {bq_tables}')\n",
"destination_uris = []\n",
"\n",
"for bq_table in bq_tables:\n",
" # Export the bigquery data to Google Bucket\n",
" destination_uri = f\"gs://{BUCKET_NAME}/{BQ_DATABASE}/{bq_table}.csv\"\n",
" dataset_ref = bigquery.DatasetReference(BQ_PROJECT, BQ_DATABASE)\n",
" table_ref = dataset_ref.table(bq_table)\n",
"\n",
" destination_uris.append(destination_uri)\n",
"\n",
" extract_job = client.extract_table(\n",
" table_ref,\n",
" destination_uri,\n",
" # Location must match that of the source table.\n",
" location=\"US\",\n",
" ) # API request\n",
" extract_job.result() # Waits for job to complete.\n",
"\n",
" print(f\"Exported {BQ_PROJECT}:{BQ_DATABASE}.{bq_table} to {destination_uri}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### C) Retrieve Data Types and Formats \n",
"To migrate the exported .csv files to PostgreSQL, we need to fetch the Data Types and Format from the exported csv file.\n",
"This needs to be done as we're setting up the PostgreSQL table and columns first (and need to provide the columns in the setup).\n",
"We will load the .csv content into the table afterwards. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"field_names_list = []\n",
"field_types_list = []\n",
"\n",
"for destination_uri in destination_uris:\n",
"\n",
" df = pd.read_csv(destination_uri)\n",
" field_names = df.columns\n",
" field_names_list.append(field_names)\n",
" print(f'Column Names: {field_names}\\n')\n",
" field_types = df.dtypes\n",
" field_types_list.append(field_types)\n",
" print(f'Column Names: {field_types}')\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### D) Build the SQL Query for Table Creation \n",
"Every database is different. To acommodate for different table structures depending on which BigQuery dataset is being loaded in, we will build the SQL query for creating the required PostgreSQL table dynamically. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_sql(pg_schema, bq_table, field_names, field_types): \n",
"\n",
" cols = \"\" \n",
"\n",
" for i in range(len(field_names)): \n",
" cols += str(field_names[i]) +\" \"+ str(field_types[i])\n",
" if i < (len(field_names)-1): \n",
" cols += \", \"\n",
"\n",
"\n",
" sql = f\"\"\"CREATE TABLE {pg_schema}.{bq_table}({cols})\"\"\"\n",
"\n",
" return sql\n",
"\n",
"#Please specify the PGSchema or leave it as default (public)\n",
"PG_SCHEMA = BQ_DATABASE\n",
"create_table_sqls = []\n",
"\n",
"for bq_table, field_names, field_types in zip(bq_tables, field_names_list, field_types_list):\n",
"\n",
" sql = get_sql(PG_SCHEMA, bq_table, field_names, field_types)\n",
" print(f'\\nsql for creating {bq_table} PostgreSQL table: \\n{sql} \\n')\n",
" create_table_sqls.append(sql)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### E) Create the PostgreSQL Table"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio \n",
"import asyncpg\n",
"from google.cloud.sql.connector import Connector\n",
"\n",
"async def create_pg_schema(PROJECT_ID,\n",
" PG_REGION,\n",
" PG_INSTANCE,\n",
" PG_PASSWORD, \n",
" PG_DATABASE,\n",
" PG_USER,\n",
" PG_SCHEMA): \n",
" \"\"\"Delete if PG Schema Exists and create a fresh copy\"\"\"\n",
" loop = asyncio.get_running_loop()\n",
" async with Connector(loop=loop) as connector:\n",
" # Create connection to Cloud SQL database\n",
" conn: asyncpg.Connection = await connector.connect_async(\n",
" f\"{PROJECT_ID}:{PG_REGION}:{PG_INSTANCE}\", # Cloud SQL instance connection name\n",
" \"asyncpg\",\n",
" user=f\"{PG_USER}\",\n",
" db=f\"{PG_DATABASE}\",\n",
" password=f\"{PG_PASSWORD}\"\n",
" )\n",
"\n",
" await conn.execute(f\"DROP SCHEMA IF EXISTS {PG_SCHEMA} CASCADE\") \n",
"\n",
" await conn.execute(f\"CREATE SCHEMA {PG_SCHEMA}\") \n",
"\n",
" await conn.close()\n",
"\n",
"\n",
"async def create_pg_table(PROJECT_ID,\n",
" PG_REGION,\n",
" PG_INSTANCE,\n",
" PG_PASSWORD,\n",
" bq_tables, \n",
" PG_DATABASE, \n",
" create_table_sqls,\n",
" PG_USER): \n",
" \"\"\"Create PG Table from BQ Schema\"\"\"\n",
" \n",
" \n",
" loop = asyncio.get_running_loop()\n",
" async with Connector(loop=loop) as connector:\n",
" # Create connection to Cloud SQL database\n",
" conn: asyncpg.Connection = await connector.connect_async(\n",
" f\"{PROJECT_ID}:{PG_REGION}:{PG_INSTANCE}\", # Cloud SQL instance connection name\n",
" \"asyncpg\",\n",
" user=f\"{PG_USER}\",\n",
" db=f\"{PG_DATABASE}\",\n",
" password=f\"{PG_PASSWORD}\"\n",
" )\n",
"\n",
" \n",
" for bq_table, sql in zip(bq_tables, create_table_sqls):\n",
" # Replace the Data Types to work with PostgreSQL supported ones \n",
" sql = sql.replace(\"object,\", \"TEXT,\").replace(\"int64\", \"INTEGER\").replace(\"float64\", \"DOUBLE PRECISION\")\n",
"\n",
"\n",
" await conn.execute(f\"DROP TABLE IF EXISTS {bq_table} CASCADE\")\n",
" \n",
" # Create the table.\n",
" await conn.execute(sql)\n",
"\n",
" await conn.close()\n",
"\n",
"# Delete schema if exists and create a fresh copy\n",
"await(create_pg_schema(PROJECT_ID, PG_REGION, PG_INSTANCE, PG_PASSWORD, PG_DATABASE, PG_USER, PG_SCHEMA))\n",
"# # Create PG Tables\n",
"await(create_pg_table(PROJECT_ID, PG_REGION, PG_INSTANCE, PG_PASSWORD, bq_tables, PG_DATABASE, create_table_sqls,PG_USER))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### F) Import Data to PostgreSQL Table\n",
"The below cell will iterate through each export file on our Google Cloud Storage Bucket and load it to the PostgreSQL instance. \n",
"This may take a while, depending on the size of the BigQuery public dataset. You can optionally set the LIMIT parameter to limit how many export files will be loaded in. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"async def import_to_pg(PROJECT_ID,\n",
" PG_REGION,\n",
" PG_INSTANCE,\n",
" PG_USER,\n",
" PG_PASSWORD,\n",
" PG_DATABASE,\n",
" PG_SCHEMA,\n",
" bq_tables, \n",
" BUCKET_NAME): \n",
" from google.cloud import storage\n",
" import pandas as pd \n",
" import asyncio\n",
" import asyncpg\n",
" from google.cloud.sql.connector import Connector\n",
"\n",
" storage_client = storage.Client(project=PROJECT_ID)\n",
"\n",
" # bucket = storage_client.get_bucket(BUCKET_NAME)\n",
" # blobs = bucket.list_blobs()\n",
"\n",
" loop = asyncio.get_running_loop()\n",
" async with Connector(loop=loop) as connector:\n",
"\n",
"\n",
" # Create connection to Cloud SQL database\n",
" conn: asyncpg.Connection = await connector.connect_async(\n",
" f\"{PROJECT_ID}:{PG_REGION}:{PG_INSTANCE}\", # Cloud SQL instance connection name\n",
" \"asyncpg\",\n",
" user=f\"{PG_USER}\",\n",
" password=f\"{PG_PASSWORD}\",\n",
" db=f\"{PG_DATABASE}\",\n",
" )\n",
" \n",
" for bq_table in bq_tables:\n",
" URI = f\"gs://{BUCKET_NAME}/{PG_SCHEMA}/{bq_table}.csv\"\n",
" print(f'URI:{URI}')\n",
" df = pd.read_csv(URI)\n",
" df = df.dropna()\n",
" df.info() \n",
"\n",
" # Copy the dataframe to the table.\n",
" tuples = list(df.itertuples(index=False))\n",
"\n",
" await conn.copy_records_to_table(\n",
" bq_table, records=tuples, columns=list(df), schema_name=PG_SCHEMA, timeout=3600\n",
" )\n",
" await conn.close()\n",
"\n",
"# # Load Data into PG Table \n",
"await(import_to_pg(PROJECT_ID, PG_REGION, PG_INSTANCE, PG_USER, PG_PASSWORD, PG_DATABASE, PG_SCHEMA,bq_tables, BUCKET_NAME))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### If all the above steps are executed suucessfully, the Bigquery public dataset should be copied to Cloud SQL for PostgreSQL on your GCP project"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}