In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# MSSQL to PostgreSQL Migration
<table align="left">
<td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mssql2postgresql/mssql-to-postgres-notebook.ipynb">
        <img src="../images/colab-logo-32px.png" alt="Colab logo" />Run in Colab
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fmssql2postgresql%2Fmssql-to-postgres-notebook.ipynb">
        <img src="../images/colab-enterprise-logo-32px.png" alt="GCP Colab Enterprise logo" />Run in Colab Enterprise
    </a>
</td>
<td>
    <a href="https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mssql2postgresql/mssql-to-postgres-notebook.ipynb">
        <img src="../images/github-logo-32px.png" alt="GitHub logo" />View on GitHub
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/mssql2postgresql/mssql-to-postgres-notebook.ipynb">
        <img src="../images/vertexai.jpg" alt="Vertex AI logo" />Open in Vertex AI Workbench
    </a>
</td>
</table>

#### References
- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)
- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)
- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)
- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)

This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account.
Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account.
#### Permissions
Make sure that the service account used to run the notebook has the following roles:
- roles/aiplatform.serviceAgent
- roles/aiplatform.customCodeServiceAgent
- roles/storage.objectCreator
- roles/storage.objectViewer
- roles/dataproc.editor
- roles/dataproc.worker

## Contact
Share you feedback, ideas, thoughts [feedback-form](https://forms.gle/XXCJeWeCJJ9fNLQS6)  
Questions, issues, and comments should be directed to dataproc-templates-support-external@googlegroups.com

## Step 1: Install Libraries
#### Run Step 1 one time for each new notebook instance"

In [None]:
!pip3 install pymssql SQLAlchemy
!pip3 install --upgrade google-cloud-pipeline-components kfp --user -q
!pip3 install psycopg2

#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.

Uncomment & Run this cell if you have installed anything from above commands

In [None]:
# import os
# import IPython
# if not os.getenv("IS_TESTING"):
#     app = IPython.Application.instance()
#     app.kernel.do_shutdown(True)

## Step 2: Import Libraries

In [None]:
import google.cloud.aiplatform as aiplatform
from kfp import dsl
from kfp import compiler
from datetime import datetime
import time
import copy
import json
import pandas as pd

try:
    from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp
except ModuleNotFoundError:
    from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp
    
import sqlalchemy
from sqlalchemy import text
import pymssql
import math
from pathlib import Path
import os

## Step 3: Assign Parameters

### Step 3.1 Common Parameters
##### PROJECT : GCP project-id
##### REGION : GCP region
##### GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts
##### SUBNET : VPC subnet
##### JARS : list of jars. For this notebook mssql connectora and postgres connectorjar is required in addition with the dataproc template 
##### MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 2
##### DATAPROC_SERVICE_ACCOUNT : Service account which will initiate serverless dataproc job

In [None]:
# Get GCP Project

PROJECT = "<project-id>"
REGION = "<region>"
GCS_STAGING_LOCATION = "<gs://bucket/[folder]>"
SUBNET = "<projects/{project}/regions/{region}/subnetworks/{subnet}>"
MAX_PARALLELISM = 5 # max number of tables which will migrated parallelly
DATAPROC_SERVICE_ACCOUNT = "" # eg: test@project_id.iam.gserviceaccount.com 


### Step 3.2 MSSQL Parameters
#### MSSQL_HOST : MSSQL instance ip address
#### MSSQL_PORT : MSSQL instance port
#### MSSQL_USERNAME : MSSQL username
#### MSSQL_PASSWORD : MSSQL password
#### MSSQL_DATABASE : name of database that you want to migrate
#### MSSQLTABLE_LIST : list of tables you want to migrate eg: ['table1','table2'] else provide an empty list for migration whole database eg : []
#### NUMBER_OF_PARTITIONS : The maximum number of partitions that can be used for parallelism in table reading and writing. Same value will be used for both input and output jdbc connection. Default set to 10

In [None]:
MSSQL_HOST="<host>"
MSSQL_PORT="<port>" 
MSSQL_USERNAME="<username>"
MSSQL_PASSWORD="<password>"
MSSQL_DATABASE="<database>"
MSSQLTABLE_LIST=[] # leave list empty for migrating complete database else provide tables as ['table1','table2']

### Step 3.3 PostgreSQL Parameters
#### POSTGRES_HOST : PostgreSQL instance ip address
#### POSTGRES_PORT : PostgreSQL instance port
#### POSTGRES_USERNAME : PostgreSQL username
#### POSTGRES_PASSWORD : PostgreSQL password
#### POSTGRES_DATABASE : name of database that you want to migrate
#### OUTPUT_MODE : Output write mode (one of: append,overwrite,ignore,errorifexists)(Defaults to overwrite)
#### BATCH_SIZE : JDBC output batch size. Default set to 1000

In [None]:
POSTGRES_HOST="<host>"
POSTGRES_PORT="<port>"
POSTGRES_USERNAME="<username>"
POSTGRES_PASSWORD="<password>"
POSTGRES_DATABASE="<database>"
JDBCTOJDBC_OUTPUT_MODE="<modeoverwrite>" # one of append/overwrite/ignore/errorifexists
JDBCTOJDBC_OUTPUT_BATCH_SIZE="1000"

### Step 3.4 Notebook Configuration Parameters
#### Below variables shoulld not be changed unless required

In [None]:
cur_path = Path(os.getcwd())
WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')

# If the above code doesn't fetches the correct path please
# provide complete path to python folder in your dataproc 
# template repo which you cloned 

# WORKING_DIRECTORY = "/home/jupyter/dataproc-templates/python/"
print(WORKING_DIRECTORY)

In [None]:
PYMSSQL_DRIVER="mssql+pymssql"
JDBC_INPUT_DRIVER="com.microsoft.sqlserver.jdbc.SQLServerDriver"
JDBC_INPUT_URL="jdbc:sqlserver://{0}:{1};databaseName={2};user={3};password={4}".format(MSSQL_HOST,MSSQL_PORT,MSSQL_DATABASE,MSSQL_USERNAME,MSSQL_PASSWORD)
MAIN_CLASS="com.google.cloud.dataproc.templates.main.DataProcTemplate"
JDBC_OUTPUT_DRIVER="org.postgresql.Driver"
JDBC_OUTPUT_URL="jdbc:postgresql://{0}:{1}/{2}?user={3}&password={4}&reWriteBatchedInserts=True".format(POSTGRES_HOST,POSTGRES_PORT,POSTGRES_DATABASE,POSTGRES_USERNAME,POSTGRES_PASSWORD)
PACKAGE_EGG_FILE="dist/dataproc_templates_distribution.egg"

PIPELINE_ROOT = GCS_STAGING_LOCATION + "/pipeline_root/dataproc_pyspark"
MAIN_PYTHON_FILE = GCS_STAGING_LOCATION + "/main.py"
PYTHON_FILE_URIS = [GCS_STAGING_LOCATION + "/dataproc_templates_distribution.egg"]

# Do not change this parameter unless you want to refer below JARS from new location
JARS = [GCS_STAGING_LOCATION + "/jars/mssql-jdbc-6.4.0.jre8.jar", GCS_STAGING_LOCATION + "/jars/postgresql-42.2.6.jar"]

## Step 4: Generate MSSQL Table List
This step creates list of tables for migration. If MSSQLTABLE_LIST is kept empty all the tables in the MSSQL_DATABASE are listed for migration otherwise the provided list is used

In [None]:
SQLTABLE_LIST=MSSQLTABLE_LIST
if len(SQLTABLE_LIST) == 0:
    DB = sqlalchemy.create_engine(
            sqlalchemy.engine.url.URL.create(
                drivername=PYMSSQL_DRIVER,
                username=MSSQL_USERNAME,
                password=MSSQL_PASSWORD,
                database=MSSQL_DATABASE,
                host=MSSQL_HOST,
                port=MSSQL_PORT
              )
            )
    with DB.connect() as conn:
        print("connected to database")
        results = conn.execute(text('select TABLE_SCHEMA,TABLE_NAME from INFORMATION_SCHEMA.Tables')).fetchall()
        print("Total Tables = ", len(results))
        for row in results:
            SQLTABLE_LIST.append(row[0]+"."+row[1])

print("list of tables for migration :")
print(SQLTABLE_LIST)

## Step 5: Get Primary Keys for partition the tables
This step fetches primary key from MSSQL_DATABASE for the tables listed for migration

In [None]:
SQL_TABLE_PRIMARY_KEYS = {}
DB = sqlalchemy.create_engine(
            sqlalchemy.engine.url.URL.create(
                drivername=PYMSSQL_DRIVER,
                username=MSSQL_USERNAME,
                password=MSSQL_PASSWORD,
                database=MSSQL_DATABASE,
                host=MSSQL_HOST,
                port=MSSQL_PORT
              )
            )
with DB.connect() as conn:
    for table in SQLTABLE_LIST:
        primary_keys = []
        print(table)
        results = conn.execute(text("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS T JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE K ON K.CONSTRAINT_NAME=T.CONSTRAINT_NAME  WHERE  K.TABLE_NAME='{0}'  AND K.TABLE_SCHEMA='{1}' AND T.CONSTRAINT_TYPE='PRIMARY KEY';".format(table.split(".")[1],table.split(".")[0]))).fetchall()
        for row in results:
            primary_keys.append(row[0])
        if primary_keys:
            SQL_TABLE_PRIMARY_KEYS[table] = ",".join(primary_keys)
        else:
            SQL_TABLE_PRIMARY_KEYS[table] = ""
            

In [None]:
pkDF = pd.DataFrame({"table" : SQLTABLE_LIST, "primary_keys": list(SQL_TABLE_PRIMARY_KEYS.values())})
print("Below are identified primary keys for migrating mssql table to postgres:")
pkDF

## Step 6: Create JAR files and Upload to GCS
#### Run Step 6 one time for each new notebook instance

In [None]:
%cd $WORKING_DIRECTORY

#### Get JDBC Connector jars

In [None]:
%%bash
wget https://jdbc.postgresql.org/download/postgresql-42.2.6.jar
wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/6.4.0.jre8/mssql-jdbc-6.4.0.jre8.jar

#### Build Dataproc Templates python package

In [None]:
! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE

#### Copying JAR files to GCS_STAGING_LOCATION

In [None]:
! gsutil cp main.py $GCS_STAGING_LOCATION/
! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/
! gsutil cp mssql-jdbc-6.4.0.jre8.jar $GCS_STAGING_LOCATION/jars/mssql-jdbc-6.4.0.jre8.jar
! gsutil cp postgresql-42.2.6.jar $GCS_STAGING_LOCATION/jars/postgresql-42.2.6.jar


## Step 7: Calculate Parallel Jobs for MSSQL to PostgreSQL
This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run

In [None]:
COMPLETE_LIST = copy.deepcopy(SQLTABLE_LIST)
PARALLEL_JOBS = len(SQLTABLE_LIST)//MAX_PARALLELISM
JOB_LIST = []
while len(COMPLETE_LIST) > 0:
    SUB_LIST = []
    for i in range(MAX_PARALLELISM):
        if len(COMPLETE_LIST)>0 :
            SUB_LIST.append(COMPLETE_LIST[0])
            COMPLETE_LIST.pop(0)
        else:
            break
    JOB_LIST.append(SUB_LIST)
print("list of tables for execution : ")
print(JOB_LIST)


## Step 8: Get Row Count of Tables and identify Partition Columns 
#### This step uses PARTITION_THRESHOLD(default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be partitioned based on Primary Keys
#### Get Primary keys for all tables to be migrated and find an integer column to partition on

In [None]:
PARTITION_THRESHOLD=1000000 #"Maximum Row Count Threshold for a Table"

CHECK_PARTITION_COLUMN_LIST={}
mssql_to_postgres_jobs = []

In [None]:
with DB.connect() as conn:
    for table in SQLTABLE_LIST:
        results = conn.execute(text("SELECT i.rowcnt FROM sysindexes AS i INNER JOIN sysobjects AS o ON i.id = o.id  WHERE i.indid < 2  AND OBJECTPROPERTY(o.id, 'IsMSShipped') = 0 AND o.Name = '{0}'".format(table.split(".")[1]))).fetchall()
        if len(results)>0 and results[0][0]>int(PARTITION_THRESHOLD) and len(SQL_TABLE_PRIMARY_KEYS.get(table).split(",")[0])>0:
            column_list=SQL_TABLE_PRIMARY_KEYS.get(table).split(",")
            for column in column_list:
                results_datatype = conn.execute(text("SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{0}' AND TABLE_NAME   = '{1}' AND COLUMN_NAME  = '{2}'".format(table.split(".")[0],table.split(".")[1],column))).fetchall()
                if results_datatype[0][0]=="int" or results_datatype[0][0]=="bigint":
                    lowerbound = conn.execute(text("SELECT min({0}) from {1}".format(column,table))).fetchall()
                    upperbound = conn.execute(text("SELECT max({0}) from {1}".format(column,table))).fetchall()
                    numberPartitions = math.ceil((upperbound[0][0]-lowerbound[0][0])/PARTITION_THRESHOLD)
                    CHECK_PARTITION_COLUMN_LIST[table]=[column,lowerbound[0][0],upperbound[0][0],numberPartitions]
                
                
print(CHECK_PARTITION_COLUMN_LIST)

## Step 9:Create Source Schemas in PostgreSQL

In [None]:
import psycopg2
postgresDB = psycopg2.connect(
                user=POSTGRES_USERNAME,
                password=POSTGRES_PASSWORD,
                dbname=POSTGRES_DATABASE,
                host=POSTGRES_HOST,
                port=POSTGRES_PORT
            )
postgresDB.autocommit = True
conn=postgresDB.cursor()

for table in SQLTABLE_LIST:
    conn.execute(text('''CREATE SCHEMA IF NOT EXISTS {};'''.format(table.split(".")[0])))

conn.close()

## Step 10: Execute Pipeline to Migrate tables from MSSQL to PostgreSQL

In [None]:
def migrate_mssql_to_postgres(EXECUTION_LIST):
    EXECUTION_LIST = EXECUTION_LIST
    aiplatform.init(project=PROJECT,staging_bucket=GCS_STAGING_LOCATION)
    
    @dsl.pipeline(
        name="python-mssql-to-postgres-pyspark",
        description="Pipeline to get data from mssql to postgres",
    )
    
    def pipeline(
        PROJECT_ID: str = PROJECT,
        LOCATION: str = REGION,
        MAIN_PYTHON_CLASS: str = MAIN_PYTHON_FILE,
        PYTHON_FILE_URIS: list = PYTHON_FILE_URIS,
        JAR_FILE_URIS: list = JARS,
        SUBNETWORK_URI: str = SUBNET,
        SERVICE_ACCOUNT: str = DATAPROC_SERVICE_ACCOUNT
        ):
        for table in EXECUTION_LIST:
            BATCH_ID = "mssql2pg-{}-{}".format(table,datetime.now().strftime("%s")).replace('.','-').replace('_','-').lower()
            mssql_to_postgres_jobs.append(BATCH_ID)
            
            
            if table in CHECK_PARTITION_COLUMN_LIST.keys():
                TEMPLATE_SPARK_ARGS = [
                "--template=JDBCTOJDBC",
                "--jdbctojdbc.input.url={}".format(JDBC_INPUT_URL),
                "--jdbctojdbc.input.driver={}".format(JDBC_INPUT_DRIVER),
                "--jdbctojdbc.input.table={}".format(table),
                "--jdbctojdbc.output.url={}".format(JDBC_OUTPUT_URL),
                "--jdbctojdbc.output.driver={}".format(JDBC_OUTPUT_DRIVER),
                "--jdbctojdbc.output.table={}".format(table),
                "--jdbctojdbc.input.partitioncolumn={}".format(CHECK_PARTITION_COLUMN_LIST[table][0]),
                "--jdbctojdbc.input.lowerbound={}".format(CHECK_PARTITION_COLUMN_LIST[table][1]),
                "--jdbctojdbc.input.upperbound={}".format(CHECK_PARTITION_COLUMN_LIST[table][2]),
                "--jdbctojdbc.numpartitions={}".format(CHECK_PARTITION_COLUMN_LIST[table][3]),
                "--jdbctojdbc.output.mode={}".format(JDBCTOJDBC_OUTPUT_MODE),
                "--jdbctojdbc.output.batch.size={}".format(JDBCTOJDBC_OUTPUT_BATCH_SIZE)
                ]
            else:
                TEMPLATE_SPARK_ARGS = [
                "--template=JDBCTOJDBC",
                "--jdbctojdbc.input.url={}".format(JDBC_INPUT_URL),
                "--jdbctojdbc.input.driver={}".format(JDBC_INPUT_DRIVER),
                "--jdbctojdbc.input.table={}".format(table),
                "--jdbctojdbc.output.url={}".format(JDBC_OUTPUT_URL),
                "--jdbctojdbc.output.driver={}".format(JDBC_OUTPUT_DRIVER),
                "--jdbctojdbc.output.table={}".format(table),
                "--jdbctojdbc.output.mode={}".format(JDBCTOJDBC_OUTPUT_MODE),
                "--jdbctojdbc.output.batch.size={}".format(JDBCTOJDBC_OUTPUT_BATCH_SIZE)
                ]
            

            _ = DataprocPySparkBatchOp(
                project=PROJECT_ID,
                location=LOCATION,
                batch_id=BATCH_ID,
                main_python_file_uri=MAIN_PYTHON_CLASS,
                jar_file_uris=JAR_FILE_URIS,
                python_file_uris=PYTHON_FILE_URIS,
                subnetwork_uri=SUBNETWORK_URI,
                service_account=SERVICE_ACCOUNT,
                runtime_config_version="1.1", # issue 665
                args=TEMPLATE_SPARK_ARGS
                )
            time.sleep(3)

    compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

    pipeline = aiplatform.PipelineJob(
            display_name="pipeline",
        template_path="pipeline.json",
        pipeline_root=PIPELINE_ROOT,
        enable_caching=False,
        )
    # run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using
    # specific service account instead of default service account 
    # eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')
    pipeline.run()

In [None]:
for execution_list in JOB_LIST:
    print(execution_list)
    migrate_mssql_to_postgres(execution_list)

## Step 11: Get status for tables migrated from MSSQL to PostgreSQL

In [None]:
def get_bearer_token():
    
    try:
        #Defining Scope
        CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]

        #Assigning credentials and project value
        credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)

        #Refreshing credentials data
        credentials.refresh(requests.Request())

        #Get refreshed token
        token = credentials.token
        if token:
            return (token,200)
        else:
            return "Bearer token not generated"
    except Exception as error:
        return ("Bearer token not generated. Error : {}".format(error),500)

In [None]:
from google.auth.transport import requests
import google

token = get_bearer_token()
if token[1] == 200:
    print("Bearer token generated")
else:
    print(token)

In [None]:
import requests

mssql_to_postgres_status = []
job_status_url = "https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}"
for job in mssql_to_postgres_jobs:
    auth = "Bearer " + token[0]
    url = job_status_url.format(PROJECT,REGION,job)
    headers = {
      'Content-Type': 'application/json; charset=UTF-8',
      'Authorization': auth 
    }
    response = requests.get(url, headers=headers)
    mssql_to_postgres_status.append(response.json()["state"])


In [None]:
statusDF = pd.DataFrame({"table" : SQLTABLE_LIST,"mssql_to_postgres_job" : mssql_to_postgres_jobs, "mssql_to_postgres_status" : mssql_to_postgres_status})
statusDF

## Step 12: Validate row counts of migrated tables from MSSQL to PostgreSQL

In [None]:
mssql_row_count = []
postgres_row_count = []

In [None]:
# get mssql table counts
DB = sqlalchemy.create_engine(
            sqlalchemy.engine.url.URL.create(
                drivername=PYMSSQL_DRIVER,
                username=MSSQL_USERNAME,
                password=MSSQL_PASSWORD,
                database=MSSQL_DATABASE,
                host=MSSQL_HOST,
                port=MSSQL_PORT
              )
            )
with DB.connect() as conn:
    for table in SQLTABLE_LIST:
        results = conn.execute(text("select count(*) from {}".format(table))).fetchall()
        for row in results:
            mssql_row_count.append(row[0])

In [None]:
import psycopg2
postgresDB = psycopg2.connect(
                user=POSTGRES_USERNAME,
                password=POSTGRES_PASSWORD,
                dbname=POSTGRES_DATABASE,
                host=POSTGRES_HOST,
                port=POSTGRES_PORT
            )

conn=postgresDB.cursor()
for table in SQLTABLE_LIST:
    conn.execute(text('''select count(*) from {}'''.format(table)))
    results = conn.fetchall()
    for row in results:
            postgres_row_count.append(row[0])

conn.close()

In [None]:
statusDF['mssql_row_count'] = mssql_row_count 
statusDF['postgres_row_count'] = postgres_row_count 
statusDF