notebooks/mssql2postgresql/mssql-to-postgres-notebook.ipynb (919 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "a5baf865-b719-4c0f-bc2c-4ac062cd7927",
"metadata": {},
"outputs": [],
"source": [
"# Copyright 2022 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"id": "1f4d4022-15d2-4171-9460-425dcd7d9334",
"metadata": {},
"source": [
"# MSSQL to PostgreSQL Migration\n",
"<table align=\"left\">\n",
"<td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mssql2postgresql/mssql-to-postgres-notebook.ipynb\">\n",
" <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fmssql2postgresql%2Fmssql-to-postgres-notebook.ipynb\">\n",
" <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mssql2postgresql/mssql-to-postgres-notebook.ipynb\">\n",
" <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/mssql2postgresql/mssql-to-postgres-notebook.ipynb\">\n",
" <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n",
" </a>\n",
"</td>\n",
"</table>"
]
},
{
"cell_type": "markdown",
"id": "f4e7cc8e-c949-4d7d-a9f8-1ecd0d827f4b",
"metadata": {},
"source": [
"#### References\n",
"- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)\n",
"- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n",
"- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n",
"- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n",
"\n",
"This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account.\n",
"Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account.\n",
"#### Permissions\n",
"Make sure that the service account used to run the notebook has the following roles:\n",
"- roles/aiplatform.serviceAgent\n",
"- roles/aiplatform.customCodeServiceAgent\n",
"- roles/storage.objectCreator\n",
"- roles/storage.objectViewer\n",
"- roles/dataproc.editor\n",
"- roles/dataproc.worker"
]
},
{
"cell_type": "markdown",
"id": "9c27acdd-4438-4000-9b45-8b79a8805e87",
"metadata": {},
"source": [
"## Contact\n",
"Share you feedback, ideas, thoughts [feedback-form](https://forms.gle/XXCJeWeCJJ9fNLQS6) \n",
"Questions, issues, and comments should be directed to dataproc-templates-support-external@googlegroups.com"
]
},
{
"cell_type": "markdown",
"id": "31512ac6-a760-4f95-bb17-e5fc81b7d995",
"metadata": {
"tags": []
},
"source": [
"## Step 1: Install Libraries\n",
"#### Run Step 1 one time for each new notebook instance\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1feb899d-ea30-4ae5-9e19-3d3c85d5b663",
"metadata": {},
"outputs": [],
"source": [
"!pip3 install pymssql SQLAlchemy\n",
"!pip3 install --upgrade google-cloud-pipeline-components kfp --user -q\n",
"!pip3 install psycopg2"
]
},
{
"cell_type": "markdown",
"id": "46a06f58-2c6e-4350-962c-9c398f0a31f2",
"metadata": {},
"source": [
"#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.\n",
"\n",
"Uncomment & Run this cell if you have installed anything from above commands"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4dc31237-ea30-475e-8254-b62765fec009",
"metadata": {},
"outputs": [],
"source": [
"# import os\n",
"# import IPython\n",
"# if not os.getenv(\"IS_TESTING\"):\n",
"# app = IPython.Application.instance()\n",
"# app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"id": "84557378-e132-4cf1-9a05-a2c18d65d404",
"metadata": {},
"source": [
"## Step 2: Import Libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7f7649c8-a118-42d4-b385-c49be2cc1070",
"metadata": {},
"outputs": [],
"source": [
"import google.cloud.aiplatform as aiplatform\n",
"from kfp import dsl\n",
"from kfp import compiler\n",
"from datetime import datetime\n",
"import time\n",
"import copy\n",
"import json\n",
"import pandas as pd\n",
"\n",
"try:\n",
" from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp\n",
"except ModuleNotFoundError:\n",
" from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp\n",
" \n",
"import sqlalchemy\n",
"from sqlalchemy import text\n",
"import pymssql\n",
"import math\n",
"from pathlib import Path\n",
"import os"
]
},
{
"cell_type": "markdown",
"id": "abd3ac39-28ab-4339-8865-0a7024963bc6",
"metadata": {
"tags": []
},
"source": [
"## Step 3: Assign Parameters"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "03c17a10-6166-4c6b-9297-127dcbb8c1be",
"metadata": {},
"source": [
"### Step 3.1 Common Parameters\n",
"##### PROJECT : GCP project-id\n",
"##### REGION : GCP region\n",
"##### GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts\n",
"##### SUBNET : VPC subnet\n",
"##### JARS : list of jars. For this notebook mssql connectora and postgres connectorjar is required in addition with the dataproc template \n",
"##### MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 2\n",
"##### DATAPROC_SERVICE_ACCOUNT : Service account which will initiate serverless dataproc job"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "821480e9-c9a1-42eb-9604-dc33c5881689",
"metadata": {},
"outputs": [],
"source": [
"# Get GCP Project\n",
"\n",
"PROJECT = \"<project-id>\"\n",
"REGION = \"<region>\"\n",
"GCS_STAGING_LOCATION = \"<gs://bucket/[folder]>\"\n",
"SUBNET = \"<projects/{project}/regions/{region}/subnetworks/{subnet}>\"\n",
"MAX_PARALLELISM = 5 # max number of tables which will migrated parallelly\n",
"DATAPROC_SERVICE_ACCOUNT = \"\" # eg: test@project_id.iam.gserviceaccount.com \n"
]
},
{
"cell_type": "markdown",
"id": "66d0d0a9-5556-494d-bde4-0e9998795e5f",
"metadata": {},
"source": [
"### Step 3.2 MSSQL Parameters\n",
"#### MSSQL_HOST : MSSQL instance ip address\n",
"#### MSSQL_PORT : MSSQL instance port\n",
"#### MSSQL_USERNAME : MSSQL username\n",
"#### MSSQL_PASSWORD : MSSQL password\n",
"#### MSSQL_DATABASE : name of database that you want to migrate\n",
"#### MSSQLTABLE_LIST : list of tables you want to migrate eg: ['table1','table2'] else provide an empty list for migration whole database eg : []\n",
"#### NUMBER_OF_PARTITIONS : The maximum number of partitions that can be used for parallelism in table reading and writing. Same value will be used for both input and output jdbc connection. Default set to 10"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dacdf900-5609-45ba-81cc-cfa5d27685ed",
"metadata": {},
"outputs": [],
"source": [
"MSSQL_HOST=\"<host>\"\n",
"MSSQL_PORT=\"<port>\" \n",
"MSSQL_USERNAME=\"<username>\"\n",
"MSSQL_PASSWORD=\"<password>\"\n",
"MSSQL_DATABASE=\"<database>\"\n",
"MSSQLTABLE_LIST=[] # leave list empty for migrating complete database else provide tables as ['table1','table2']"
]
},
{
"cell_type": "markdown",
"id": "2e01a00f-0c36-44f1-93bd-d849c62d5bac",
"metadata": {
"tags": []
},
"source": [
"### Step 3.3 PostgreSQL Parameters\n",
"#### POSTGRES_HOST : PostgreSQL instance ip address\n",
"#### POSTGRES_PORT : PostgreSQL instance port\n",
"#### POSTGRES_USERNAME : PostgreSQL username\n",
"#### POSTGRES_PASSWORD : PostgreSQL password\n",
"#### POSTGRES_DATABASE : name of database that you want to migrate\n",
"#### OUTPUT_MODE : Output write mode (one of: append,overwrite,ignore,errorifexists)(Defaults to overwrite)\n",
"#### BATCH_SIZE : JDBC output batch size. Default set to 1000"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "35947f0a-860a-46c0-b89e-6be99dea113c",
"metadata": {},
"outputs": [],
"source": [
"POSTGRES_HOST=\"<host>\"\n",
"POSTGRES_PORT=\"<port>\"\n",
"POSTGRES_USERNAME=\"<username>\"\n",
"POSTGRES_PASSWORD=\"<password>\"\n",
"POSTGRES_DATABASE=\"<database>\"\n",
"JDBCTOJDBC_OUTPUT_MODE=\"<modeoverwrite>\" # one of append/overwrite/ignore/errorifexists\n",
"JDBCTOJDBC_OUTPUT_BATCH_SIZE=\"1000\""
]
},
{
"cell_type": "markdown",
"id": "ba335fa7-d46f-4253-bc7f-b0cb5c303592",
"metadata": {},
"source": [
"### Step 3.4 Notebook Configuration Parameters\n",
"#### Below variables shoulld not be changed unless required"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f204af9b",
"metadata": {},
"outputs": [],
"source": [
"cur_path = Path(os.getcwd())\n",
"WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')\n",
"\n",
"# If the above code doesn't fetches the correct path please\n",
"# provide complete path to python folder in your dataproc \n",
"# template repo which you cloned \n",
"\n",
"# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/python/\"\n",
"print(WORKING_DIRECTORY)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "34123cf3-4ce0-49c6-b7b9-ed40f4fb199f",
"metadata": {},
"outputs": [],
"source": [
"PYMSSQL_DRIVER=\"mssql+pymssql\"\n",
"JDBC_INPUT_DRIVER=\"com.microsoft.sqlserver.jdbc.SQLServerDriver\"\n",
"JDBC_INPUT_URL=\"jdbc:sqlserver://{0}:{1};databaseName={2};user={3};password={4}\".format(MSSQL_HOST,MSSQL_PORT,MSSQL_DATABASE,MSSQL_USERNAME,MSSQL_PASSWORD)\n",
"MAIN_CLASS=\"com.google.cloud.dataproc.templates.main.DataProcTemplate\"\n",
"JDBC_OUTPUT_DRIVER=\"org.postgresql.Driver\"\n",
"JDBC_OUTPUT_URL=\"jdbc:postgresql://{0}:{1}/{2}?user={3}&password={4}&reWriteBatchedInserts=True\".format(POSTGRES_HOST,POSTGRES_PORT,POSTGRES_DATABASE,POSTGRES_USERNAME,POSTGRES_PASSWORD)\n",
"PACKAGE_EGG_FILE=\"dist/dataproc_templates_distribution.egg\"\n",
"\n",
"PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n",
"MAIN_PYTHON_FILE = GCS_STAGING_LOCATION + \"/main.py\"\n",
"PYTHON_FILE_URIS = [GCS_STAGING_LOCATION + \"/dataproc_templates_distribution.egg\"]\n",
"\n",
"# Do not change this parameter unless you want to refer below JARS from new location\n",
"JARS = [GCS_STAGING_LOCATION + \"/jars/mssql-jdbc-6.4.0.jre8.jar\", GCS_STAGING_LOCATION + \"/jars/postgresql-42.2.6.jar\"]"
]
},
{
"cell_type": "markdown",
"id": "3ca6ce4f-cfd2-4726-8e80-9cb059a05550",
"metadata": {},
"source": [
"## Step 4: Generate MSSQL Table List\n",
"This step creates list of tables for migration. If MSSQLTABLE_LIST is kept empty all the tables in the MSSQL_DATABASE are listed for migration otherwise the provided list is used"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3a7a89ee-7500-4ad1-b444-0971b40599f5",
"metadata": {},
"outputs": [],
"source": [
"SQLTABLE_LIST=MSSQLTABLE_LIST\n",
"if len(SQLTABLE_LIST) == 0:\n",
" DB = sqlalchemy.create_engine(\n",
" sqlalchemy.engine.url.URL.create(\n",
" drivername=PYMSSQL_DRIVER,\n",
" username=MSSQL_USERNAME,\n",
" password=MSSQL_PASSWORD,\n",
" database=MSSQL_DATABASE,\n",
" host=MSSQL_HOST,\n",
" port=MSSQL_PORT\n",
" )\n",
" )\n",
" with DB.connect() as conn:\n",
" print(\"connected to database\")\n",
" results = conn.execute(text('select TABLE_SCHEMA,TABLE_NAME from INFORMATION_SCHEMA.Tables')).fetchall()\n",
" print(\"Total Tables = \", len(results))\n",
" for row in results:\n",
" SQLTABLE_LIST.append(row[0]+\".\"+row[1])\n",
"\n",
"print(\"list of tables for migration :\")\n",
"print(SQLTABLE_LIST)"
]
},
{
"cell_type": "markdown",
"id": "dd9304c2-e713-482a-aa9e-ddc97578b98b",
"metadata": {},
"source": [
"## Step 5: Get Primary Keys for partition the tables\n",
"This step fetches primary key from MSSQL_DATABASE for the tables listed for migration"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3ab28c89-4c2f-486e-9d7b-edfc89f0d5d0",
"metadata": {},
"outputs": [],
"source": [
"SQL_TABLE_PRIMARY_KEYS = {}\n",
"DB = sqlalchemy.create_engine(\n",
" sqlalchemy.engine.url.URL.create(\n",
" drivername=PYMSSQL_DRIVER,\n",
" username=MSSQL_USERNAME,\n",
" password=MSSQL_PASSWORD,\n",
" database=MSSQL_DATABASE,\n",
" host=MSSQL_HOST,\n",
" port=MSSQL_PORT\n",
" )\n",
" )\n",
"with DB.connect() as conn:\n",
" for table in SQLTABLE_LIST:\n",
" primary_keys = []\n",
" print(table)\n",
" results = conn.execute(text(\"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS T JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE K ON K.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE K.TABLE_NAME='{0}' AND K.TABLE_SCHEMA='{1}' AND T.CONSTRAINT_TYPE='PRIMARY KEY';\".format(table.split(\".\")[1],table.split(\".\")[0]))).fetchall()\n",
" for row in results:\n",
" primary_keys.append(row[0])\n",
" if primary_keys:\n",
" SQL_TABLE_PRIMARY_KEYS[table] = \",\".join(primary_keys)\n",
" else:\n",
" SQL_TABLE_PRIMARY_KEYS[table] = \"\"\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "59679f52-0774-4cd6-8822-b6e80fb08224",
"metadata": {},
"outputs": [],
"source": [
"pkDF = pd.DataFrame({\"table\" : SQLTABLE_LIST, \"primary_keys\": list(SQL_TABLE_PRIMARY_KEYS.values())})\n",
"print(\"Below are identified primary keys for migrating mssql table to postgres:\")\n",
"pkDF"
]
},
{
"cell_type": "markdown",
"id": "363d46ea-d3a7-4e52-b527-21bee93fd305",
"metadata": {},
"source": [
"## Step 6: Create JAR files and Upload to GCS\n",
"#### Run Step 6 one time for each new notebook instance"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b3742b9a-143d-49fc-b43c-e56179c7f0f2",
"metadata": {},
"outputs": [],
"source": [
"%cd $WORKING_DIRECTORY"
]
},
{
"cell_type": "markdown",
"id": "c4087b4b-c5be-407d-b2bd-e5d6da5a0222",
"metadata": {},
"source": [
"#### Get JDBC Connector jars"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a0fa7572-0a4b-49c7-9359-cf3ce4514b48",
"metadata": {},
"outputs": [],
"source": [
"%%bash\n",
"wget https://jdbc.postgresql.org/download/postgresql-42.2.6.jar\n",
"wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/6.4.0.jre8/mssql-jdbc-6.4.0.jre8.jar"
]
},
{
"cell_type": "markdown",
"id": "9e141fde-4e48-4953-833b-176e255037c9",
"metadata": {},
"source": [
"#### Build Dataproc Templates python package"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6f399004-26bf-480a-9640-5cfb97994f4c",
"metadata": {},
"outputs": [],
"source": [
"! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE"
]
},
{
"cell_type": "markdown",
"id": "5bea938a-1951-44d4-a18b-063b9d7c8bd4",
"metadata": {},
"source": [
"#### Copying JAR files to GCS_STAGING_LOCATION"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "33dd85ce-46f6-4b35-8997-2189cf1b2eee",
"metadata": {},
"outputs": [],
"source": [
"! gsutil cp main.py $GCS_STAGING_LOCATION/\n",
"! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/\n",
"! gsutil cp mssql-jdbc-6.4.0.jre8.jar $GCS_STAGING_LOCATION/jars/mssql-jdbc-6.4.0.jre8.jar\n",
"! gsutil cp postgresql-42.2.6.jar $GCS_STAGING_LOCATION/jars/postgresql-42.2.6.jar\n"
]
},
{
"cell_type": "markdown",
"id": "cf52c73c-ad16-4932-a421-349f5f718df2",
"metadata": {},
"source": [
"## Step 7: Calculate Parallel Jobs for MSSQL to PostgreSQL\n",
"This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f8bc06aa-298b-4d1f-9822-62264371a681",
"metadata": {},
"outputs": [],
"source": [
"COMPLETE_LIST = copy.deepcopy(SQLTABLE_LIST)\n",
"PARALLEL_JOBS = len(SQLTABLE_LIST)//MAX_PARALLELISM\n",
"JOB_LIST = []\n",
"while len(COMPLETE_LIST) > 0:\n",
" SUB_LIST = []\n",
" for i in range(MAX_PARALLELISM):\n",
" if len(COMPLETE_LIST)>0 :\n",
" SUB_LIST.append(COMPLETE_LIST[0])\n",
" COMPLETE_LIST.pop(0)\n",
" else:\n",
" break\n",
" JOB_LIST.append(SUB_LIST)\n",
"print(\"list of tables for execution : \")\n",
"print(JOB_LIST)\n"
]
},
{
"cell_type": "markdown",
"id": "84fd6c0e-9128-4981-916e-f774e40f9120",
"metadata": {},
"source": [
"## Step 8: Get Row Count of Tables and identify Partition Columns \n",
"#### This step uses PARTITION_THRESHOLD(default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be partitioned based on Primary Keys\n",
"#### Get Primary keys for all tables to be migrated and find an integer column to partition on"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bfa2bacc-b3b0-43f0-8825-8744010a25b2",
"metadata": {},
"outputs": [],
"source": [
"PARTITION_THRESHOLD=1000000 #\"Maximum Row Count Threshold for a Table\"\n",
"\n",
"CHECK_PARTITION_COLUMN_LIST={}\n",
"mssql_to_postgres_jobs = []"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f9725435-79c2-4b40-9aa7-e64e7440fb88",
"metadata": {},
"outputs": [],
"source": [
"with DB.connect() as conn:\n",
" for table in SQLTABLE_LIST:\n",
" results = conn.execute(text(\"SELECT i.rowcnt FROM sysindexes AS i INNER JOIN sysobjects AS o ON i.id = o.id WHERE i.indid < 2 AND OBJECTPROPERTY(o.id, 'IsMSShipped') = 0 AND o.Name = '{0}'\".format(table.split(\".\")[1]))).fetchall()\n",
" if len(results)>0 and results[0][0]>int(PARTITION_THRESHOLD) and len(SQL_TABLE_PRIMARY_KEYS.get(table).split(\",\")[0])>0:\n",
" column_list=SQL_TABLE_PRIMARY_KEYS.get(table).split(\",\")\n",
" for column in column_list:\n",
" results_datatype = conn.execute(text(\"SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{0}' AND TABLE_NAME = '{1}' AND COLUMN_NAME = '{2}'\".format(table.split(\".\")[0],table.split(\".\")[1],column))).fetchall()\n",
" if results_datatype[0][0]==\"int\" or results_datatype[0][0]==\"bigint\":\n",
" lowerbound = conn.execute(text(\"SELECT min({0}) from {1}\".format(column,table))).fetchall()\n",
" upperbound = conn.execute(text(\"SELECT max({0}) from {1}\".format(column,table))).fetchall()\n",
" numberPartitions = math.ceil((upperbound[0][0]-lowerbound[0][0])/PARTITION_THRESHOLD)\n",
" CHECK_PARTITION_COLUMN_LIST[table]=[column,lowerbound[0][0],upperbound[0][0],numberPartitions]\n",
" \n",
" \n",
"print(CHECK_PARTITION_COLUMN_LIST)"
]
},
{
"cell_type": "markdown",
"id": "cae5f3ed-8ba4-4801-bb31-296221d64578",
"metadata": {},
"source": [
"## Step 9:Create Source Schemas in PostgreSQL"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ad52bc86-1c77-4138-ab58-f849226b4137",
"metadata": {},
"outputs": [],
"source": [
"import psycopg2\n",
"postgresDB = psycopg2.connect(\n",
" user=POSTGRES_USERNAME,\n",
" password=POSTGRES_PASSWORD,\n",
" dbname=POSTGRES_DATABASE,\n",
" host=POSTGRES_HOST,\n",
" port=POSTGRES_PORT\n",
" )\n",
"postgresDB.autocommit = True\n",
"conn=postgresDB.cursor()\n",
"\n",
"for table in SQLTABLE_LIST:\n",
" conn.execute(text('''CREATE SCHEMA IF NOT EXISTS {};'''.format(table.split(\".\")[0])))\n",
"\n",
"conn.close()"
]
},
{
"cell_type": "markdown",
"id": "11fce10a-6d1e-4fd6-b1f6-9175680614dd",
"metadata": {},
"source": [
"## Step 10: Execute Pipeline to Migrate tables from MSSQL to PostgreSQL"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "37abf9cd-d4df-4760-8dda-351ede03bf10",
"metadata": {},
"outputs": [],
"source": [
"def migrate_mssql_to_postgres(EXECUTION_LIST):\n",
" EXECUTION_LIST = EXECUTION_LIST\n",
" aiplatform.init(project=PROJECT,staging_bucket=GCS_STAGING_LOCATION)\n",
" \n",
" @dsl.pipeline(\n",
" name=\"python-mssql-to-postgres-pyspark\",\n",
" description=\"Pipeline to get data from mssql to postgres\",\n",
" )\n",
" \n",
" def pipeline(\n",
" PROJECT_ID: str = PROJECT,\n",
" LOCATION: str = REGION,\n",
" MAIN_PYTHON_CLASS: str = MAIN_PYTHON_FILE,\n",
" PYTHON_FILE_URIS: list = PYTHON_FILE_URIS,\n",
" JAR_FILE_URIS: list = JARS,\n",
" SUBNETWORK_URI: str = SUBNET,\n",
" SERVICE_ACCOUNT: str = DATAPROC_SERVICE_ACCOUNT\n",
" ):\n",
" for table in EXECUTION_LIST:\n",
" BATCH_ID = \"mssql2pg-{}-{}\".format(table,datetime.now().strftime(\"%s\")).replace('.','-').replace('_','-').lower()\n",
" mssql_to_postgres_jobs.append(BATCH_ID)\n",
" \n",
" \n",
" if table in CHECK_PARTITION_COLUMN_LIST.keys():\n",
" TEMPLATE_SPARK_ARGS = [\n",
" \"--template=JDBCTOJDBC\",\n",
" \"--jdbctojdbc.input.url={}\".format(JDBC_INPUT_URL),\n",
" \"--jdbctojdbc.input.driver={}\".format(JDBC_INPUT_DRIVER),\n",
" \"--jdbctojdbc.input.table={}\".format(table),\n",
" \"--jdbctojdbc.output.url={}\".format(JDBC_OUTPUT_URL),\n",
" \"--jdbctojdbc.output.driver={}\".format(JDBC_OUTPUT_DRIVER),\n",
" \"--jdbctojdbc.output.table={}\".format(table),\n",
" \"--jdbctojdbc.input.partitioncolumn={}\".format(CHECK_PARTITION_COLUMN_LIST[table][0]),\n",
" \"--jdbctojdbc.input.lowerbound={}\".format(CHECK_PARTITION_COLUMN_LIST[table][1]),\n",
" \"--jdbctojdbc.input.upperbound={}\".format(CHECK_PARTITION_COLUMN_LIST[table][2]),\n",
" \"--jdbctojdbc.numpartitions={}\".format(CHECK_PARTITION_COLUMN_LIST[table][3]),\n",
" \"--jdbctojdbc.output.mode={}\".format(JDBCTOJDBC_OUTPUT_MODE),\n",
" \"--jdbctojdbc.output.batch.size={}\".format(JDBCTOJDBC_OUTPUT_BATCH_SIZE)\n",
" ]\n",
" else:\n",
" TEMPLATE_SPARK_ARGS = [\n",
" \"--template=JDBCTOJDBC\",\n",
" \"--jdbctojdbc.input.url={}\".format(JDBC_INPUT_URL),\n",
" \"--jdbctojdbc.input.driver={}\".format(JDBC_INPUT_DRIVER),\n",
" \"--jdbctojdbc.input.table={}\".format(table),\n",
" \"--jdbctojdbc.output.url={}\".format(JDBC_OUTPUT_URL),\n",
" \"--jdbctojdbc.output.driver={}\".format(JDBC_OUTPUT_DRIVER),\n",
" \"--jdbctojdbc.output.table={}\".format(table),\n",
" \"--jdbctojdbc.output.mode={}\".format(JDBCTOJDBC_OUTPUT_MODE),\n",
" \"--jdbctojdbc.output.batch.size={}\".format(JDBCTOJDBC_OUTPUT_BATCH_SIZE)\n",
" ]\n",
" \n",
"\n",
" _ = DataprocPySparkBatchOp(\n",
" project=PROJECT_ID,\n",
" location=LOCATION,\n",
" batch_id=BATCH_ID,\n",
" main_python_file_uri=MAIN_PYTHON_CLASS,\n",
" jar_file_uris=JAR_FILE_URIS,\n",
" python_file_uris=PYTHON_FILE_URIS,\n",
" subnetwork_uri=SUBNETWORK_URI,\n",
" service_account=SERVICE_ACCOUNT,\n",
" runtime_config_version=\"1.1\", # issue 665\n",
" args=TEMPLATE_SPARK_ARGS\n",
" )\n",
" time.sleep(3)\n",
"\n",
" compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n",
"\n",
" pipeline = aiplatform.PipelineJob(\n",
" display_name=\"pipeline\",\n",
" template_path=\"pipeline.json\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
" enable_caching=False,\n",
" )\n",
" # run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using\n",
" # specific service account instead of default service account \n",
" # eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')\n",
" pipeline.run()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "18574192-8d4d-43c8-98e5-f3d613c98fab",
"metadata": {},
"outputs": [],
"source": [
"for execution_list in JOB_LIST:\n",
" print(execution_list)\n",
" migrate_mssql_to_postgres(execution_list)"
]
},
{
"cell_type": "markdown",
"id": "fb06f8bc-ef97-4665-8a5d-71e195c0d85d",
"metadata": {},
"source": [
"## Step 11: Get status for tables migrated from MSSQL to PostgreSQL"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "136f4298-d5fd-4d3b-9f9a-12ef82d35b97",
"metadata": {},
"outputs": [],
"source": [
"def get_bearer_token():\n",
" \n",
" try:\n",
" #Defining Scope\n",
" CREDENTIAL_SCOPES = [\"https://www.googleapis.com/auth/cloud-platform\"]\n",
"\n",
" #Assigning credentials and project value\n",
" credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)\n",
"\n",
" #Refreshing credentials data\n",
" credentials.refresh(requests.Request())\n",
"\n",
" #Get refreshed token\n",
" token = credentials.token\n",
" if token:\n",
" return (token,200)\n",
" else:\n",
" return \"Bearer token not generated\"\n",
" except Exception as error:\n",
" return (\"Bearer token not generated. Error : {}\".format(error),500)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2a5a8a6d-ca0b-4d1b-b325-f347b83f619d",
"metadata": {},
"outputs": [],
"source": [
"from google.auth.transport import requests\n",
"import google\n",
"\n",
"token = get_bearer_token()\n",
"if token[1] == 200:\n",
" print(\"Bearer token generated\")\n",
"else:\n",
" print(token)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8dcbdd49-1125-446f-bfb4-52981fec44ea",
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"\n",
"mssql_to_postgres_status = []\n",
"job_status_url = \"https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}\"\n",
"for job in mssql_to_postgres_jobs:\n",
" auth = \"Bearer \" + token[0]\n",
" url = job_status_url.format(PROJECT,REGION,job)\n",
" headers = {\n",
" 'Content-Type': 'application/json; charset=UTF-8',\n",
" 'Authorization': auth \n",
" }\n",
" response = requests.get(url, headers=headers)\n",
" mssql_to_postgres_status.append(response.json()[\"state\"])\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dde0a504-0f21-466d-b63a-b5e760344e8a",
"metadata": {},
"outputs": [],
"source": [
"statusDF = pd.DataFrame({\"table\" : SQLTABLE_LIST,\"mssql_to_postgres_job\" : mssql_to_postgres_jobs, \"mssql_to_postgres_status\" : mssql_to_postgres_status})\n",
"statusDF"
]
},
{
"cell_type": "markdown",
"id": "1386d002-3dc6-4b88-9288-45958cfc7c7a",
"metadata": {},
"source": [
"## Step 12: Validate row counts of migrated tables from MSSQL to PostgreSQL"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c80e8290-0c41-4fbf-ad93-6ec04de5d0da",
"metadata": {},
"outputs": [],
"source": [
"mssql_row_count = []\n",
"postgres_row_count = []"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ae6e1f5a-8e58-4998-b8c7-50d26ea1433e",
"metadata": {},
"outputs": [],
"source": [
"# get mssql table counts\n",
"DB = sqlalchemy.create_engine(\n",
" sqlalchemy.engine.url.URL.create(\n",
" drivername=PYMSSQL_DRIVER,\n",
" username=MSSQL_USERNAME,\n",
" password=MSSQL_PASSWORD,\n",
" database=MSSQL_DATABASE,\n",
" host=MSSQL_HOST,\n",
" port=MSSQL_PORT\n",
" )\n",
" )\n",
"with DB.connect() as conn:\n",
" for table in SQLTABLE_LIST:\n",
" results = conn.execute(text(\"select count(*) from {}\".format(table))).fetchall()\n",
" for row in results:\n",
" mssql_row_count.append(row[0])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d2f7c149-df59-4df0-87ec-7a2058503bdd",
"metadata": {},
"outputs": [],
"source": [
"import psycopg2\n",
"postgresDB = psycopg2.connect(\n",
" user=POSTGRES_USERNAME,\n",
" password=POSTGRES_PASSWORD,\n",
" dbname=POSTGRES_DATABASE,\n",
" host=POSTGRES_HOST,\n",
" port=POSTGRES_PORT\n",
" )\n",
"\n",
"conn=postgresDB.cursor()\n",
"for table in SQLTABLE_LIST:\n",
" conn.execute(text('''select count(*) from {}'''.format(table)))\n",
" results = conn.fetchall()\n",
" for row in results:\n",
" postgres_row_count.append(row[0])\n",
"\n",
"conn.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b48ac8aa-1843-4144-b6bc-6ab992b61ac9",
"metadata": {},
"outputs": [],
"source": [
"statusDF['mssql_row_count'] = mssql_row_count \n",
"statusDF['postgres_row_count'] = postgres_row_count \n",
"statusDF"
]
}
],
"metadata": {
"environment": {
"kernel": "python3",
"name": "common-cpu.m95",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m95"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel) (Local)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.9"
},
"vscode": {
"interpreter": {
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}