In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# MSSQL to BigQuery Migration
<table align="left">
<td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mssql2bq/mssql-to-bigquery-notebook.ipynb">
        <img src="../images/colab-logo-32px.png" alt="Colab logo" />Run in Colab
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fmssql2bq%2Fmssql-to-bigquery-notebook.ipynb">
        <img src="../images/colab-enterprise-logo-32px.png" alt="GCP Colab Enterprise logo" />Run in Colab Enterprise
    </a>
</td>
<td>
    <a href="https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mssql2bq/mssql-to-bigquery-notebook.ipynb">
        <img src="../images/github-logo-32px.png" alt="GitHub logo" />View on GitHub
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/mssql2bq/mssql-to-bigquery-notebook.ipynb">
        <img src="../images/vertexai.jpg" alt="Vertex AI logo" />Open in Vertex AI Workbench
    </a>
</td>
</table>

## References
* [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html) 
* [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)
* [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)
* [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)

This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account.

## Permissions
Make sure that the service account used to run the notebook has the following roles:

* roles/aiplatform.serviceAgent
* roles/aiplatform.customCodeServiceAgent
* roles/storage.objectCreator
* roles/storage.objectViewer
* roles/dataproc.editor
* roles/dataproc.worker

# Step 1: Install Libraries
<div class="alert alert-block alert-info">
<b>NOTE: </b>Run Step 1 one time for each new notebook instance</div

In [None]:
!pip3 install pymssql SQLAlchemy
!pip3 install --upgrade google-cloud-pipeline-components kfp -q
!pip3 install google.cloud.aiplatform

### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.

Uncomment & Run this cell if you have installed anything from above commands

In [None]:
# import os
# import IPython
# if not os.getenv("IS_TESTING"):
#     app = IPython.Application.instance()
#     app.kernel.do_shutdown(True)

# Step 2: Import Libraries

In [None]:
import google.cloud.aiplatform as aiplatform
import sys, os
from kfp import dsl
from kfp import compiler
from datetime import datetime
import time
import copy
import json
import pandas as pd

try:
    from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp
except ModuleNotFoundError:
    from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp
    
import sqlalchemy
from sqlalchemy import text
import pymssql
import math
from pathlib import Path

# Step 3: Assign Parameters

## Step 3.1 Common Parameters

PROJECT : GCP project-id

REGION : GCP region

GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts

SUBNET : VPC subnet

JARS : list of jars. For this notebook mssql connectora and postgres connectorjar is required in addition with the dataproc template

MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 5

SERVICE_ACCOUNT: Custom service account email to use for vertex ai pipeline and dataproc job with above mentioned permissions

MSSQL_TO_BIGQUERY_JOBS : List of bigquery job IDs that will be created by Vertex AI pipelines to migrate data from source to BQ.

In [None]:
PROJECT = "<project-id>"
REGION = "<region>"
GCS_STAGING_LOCATION = "gs://<staging-bucket>"
SUBNET = "projects/<project-id>/regions/<region>/subnetworks/<subnet-name>"
MAX_PARALLELISM = 5 # default value is set to 5
SERVICE_ACCOUNT = ""
MSSQL_TO_BIGQUERY_JOBS = []

# Do not change this parameter unless you want to refer below JARS from new location
JARS = [GCS_STAGING_LOCATION + "/jars/mssql-jdbc-6.4.0.jre8.jar", GCS_STAGING_LOCATION + "/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar"]

# If SERVICE_ACCOUNT is not specified it will take the one attached to Notebook
if SERVICE_ACCOUNT == '':
    shell_output = !gcloud auth list 2>/dev/null
    SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
    print("Service Account: ",SERVICE_ACCOUNT)
    


## Step 3.2 MSSQL Parameters

SQL_SERVER_HOST : MSSQL instance ip address

SQL_SERVER_PORT : MSSQL instance port

SQL_SERVER_USERNAME : MSSQL username

SQL_SERVER_PASSWORD : MSSQL password

SQL_SERVER_DATABASE : name of database that you want to migrate

SQL_SERVER_TABLE_LIST : list of tables you want to migrate eg ['schema.table1','schema.table2'] else provide an empty list for migration of specific schemas or the whole database eg : []

SQL_SERVER_SCHEMA_LIST : List of schemas. Use this if you'ld like to migrate all tables associated with specific schemas eg. ['schema1','schema2']. If otherwise, leave this parameter empty eg []. 

<div class="alert alert-block alert-warning">
<b>NOTE: </b>Please ensure that SQL_SERVER_SCHEMA_LIST and SQL_SERVER_TABLE_LIST are not used simultaneously. Use only one of the two prameters at a time or leave both of these empty to migrate the entire database.</div>

In [None]:
SQL_SERVER_HOST = "<host-ip-address>"
SQL_SERVER_PORT = "1433"
SQL_SERVER_USERNAME = "<user-name>"
SQL_SERVER_PASSWORD = "<password>"
SQL_SERVER_DATABASE = "<database-name>"
SQL_SERVER_TABLE_LIST=[] # leave list empty for migrating complete database else provide tables as ['dbo.table1','sys.table2']. If this parameter is not empty, leave SQL_SERVER_SCHEMA_LIST empty.
SQL_SERVER_SCHEMA_LIST=[] # leave list empty for migrating complete database else provide schema as ['schema1','schema2'] for migrating all tables in specific schemas. If this parameter is not empty, leave SQL_SERVER_TABLE_LIST empty.

# Step 3.3 Notebook Configuration Parameters

<div class="alert alert-block alert-warning">
<b>NOTE: </b>Below variables should not be changed unless required</div>

SQL_SERVER_DRIVER : MSSQL Driver

JDBC_DRIVER : JDBC driver class

JDBC_URL : MSSQL jdbc url

MAIN_CLASS : Dataproc Template Main Class

WORKING_DIRECTORY : Python working directory

PACKAGE_EGG_FILE : Dataproc Template distributio file

PIPELINE_ROOT : Path to Vertex AI pipeline artifacts

In [None]:
SQL_SERVER_DRIVER = "mssql+pymssql"
JDBC_DRIVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
JDBC_URL = "jdbc:sqlserver://{}:{};databaseName={};user={};password={}".format(SQL_SERVER_HOST,SQL_SERVER_PORT,SQL_SERVER_DATABASE,SQL_SERVER_USERNAME,SQL_SERVER_PASSWORD)
MAIN_CLASS = "com.google.cloud.dataproc.templates.main.DataProcTemplate"
PACKAGE_EGG_FILE = "dataproc_templates_distribution.egg"
PIPELINE_ROOT = GCS_STAGING_LOCATION + "/pipeline_root/dataproc_pyspark"


In [None]:
cur_path = Path(os.getcwd())
WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')

# If the above code doesn't fetches the correct path please
# provide complete path to python folder in your dataproc 
# template repo which you cloned 

# WORKING_DIRECTORY = "/home/jupyter/dataproc-templates/python/"
print(WORKING_DIRECTORY)

# Step 4: Generate SQL SERVER Table List

This step creates list of tables for migration. 

* If SQL_SERVER_TABLE_LIST and SQL_SERVER_SCHEMA_LIST are kept empty, then all the tables in the SQL_SERVER_DATABASE are listed for migration.

* If SQL_SERVER_SCHEMA_LIST is non empty, then all tables associated with the mentioned schemas will be listed for migration

* If SQL_SERVER_TABLE_LIST is non empty, then the provided list of tables are selected for migration

In [None]:
if SQL_SERVER_SCHEMA_LIST and SQL_SERVER_TABLE_LIST:
    sys.exit("Please provide values for either SQL_SERVER_SCHEMA_LIST OR SQL_SERVER_TABLE_LIST. Non empty values for both the values at the same time are not accepted")

In [None]:
DB = sqlalchemy.create_engine(
            sqlalchemy.engine.url.URL.create(
                drivername=SQL_SERVER_DRIVER,
                username=SQL_SERVER_USERNAME,
                password=SQL_SERVER_PASSWORD,
                database=SQL_SERVER_DATABASE,
                host=SQL_SERVER_HOST,
                port=SQL_SERVER_PORT
              )
            )

with DB.connect() as conn:
        print("connected to database")
        if not SQL_SERVER_TABLE_LIST and not SQL_SERVER_SCHEMA_LIST: # Migrate all possible tables from database
            results = conn.execute(text('select TABLE_SCHEMA,TABLE_NAME from INFORMATION_SCHEMA.Tables')).fetchall()
            
        elif SQL_SERVER_SCHEMA_LIST and not SQL_SERVER_TABLE_LIST: # Only Migrate tables associated with the provided schema list
            results = conn.execute(text("select TABLE_SCHEMA,TABLE_NAME from INFORMATION_SCHEMA.Tables where TABLE_SCHEMA in ('{}');".format("','".join(SQL_SERVER_SCHEMA_LIST)))).fetchall()
        
        # when SQL_SERVER_TABLE_LIST is already not empty, only mentioned tables will be migrated
        
        print("Total Tables = ", len(results))
        for row in results:
            SQL_SERVER_TABLE_LIST.append(row[0]+"."+row[1])
        
        print("list of tables for migration :")
        print(SQL_SERVER_TABLE_LIST)
        

# Step 5: Get Primary Keys for partitioning the tables

This step fetches primary key from SQL_SERVER_DATABASE for the tables listed for migration

In [None]:
with DB.connect() as conn:
    SQL_TABLE_PRIMARY_KEYS = {}
    for table in SQL_SERVER_TABLE_LIST:
            primary_keys = []
            results = conn.execute(text("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS T JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE K ON K.CONSTRAINT_NAME=T.CONSTRAINT_NAME  WHERE  K.TABLE_NAME='{0}'  AND K.TABLE_SCHEMA='{1}' AND T.CONSTRAINT_TYPE='PRIMARY KEY';".format(table.split(".")[1],table.split(".")[0]))).fetchall()
            # print(results)
            for row in results:
                primary_keys.append(row[0])
            if primary_keys:
                SQL_TABLE_PRIMARY_KEYS[table] = ",".join(primary_keys)
            else:
                SQL_TABLE_PRIMARY_KEYS[table] = ""



In [None]:
pkDF = pd.DataFrame({"table" : SQL_SERVER_TABLE_LIST, "primary_keys": list(SQL_TABLE_PRIMARY_KEYS.values())})
print("Below are identified primary keys for migrating mssql table to bigquery:")
pkDF

# Step 6: Create JAR files and Upload to Cloud Storage
<div class="alert alert-block alert-info">
<b>NOTE: </b> Run Step 6 one time for each new notebook instance</div>

In [None]:
%cd $WORKING_DIRECTORY

### Get JDBC Connector jars

In [None]:
%%bash

wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/6.4.0.jre8/mssql-jdbc-6.4.0.jre8.jar

wget https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.27.0/spark-bigquery-with-dependencies_2.12-0.27.0.jar

### Build Dataproc Templates python package

In [None]:
! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE

### Copying JAR files to GCS_STAGING_LOCATION

In [None]:
! gsutil cp main.py $GCS_STAGING_LOCATION/
! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/
! gsutil cp mssql-jdbc-6.4.0.jre8.jar $GCS_STAGING_LOCATION/jars/mssql-jdbc-6.4.0.jre8.jar
! gsutil cp spark-bigquery-with-dependencies_2.12-0.27.0.jar $GCS_STAGING_LOCATION/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar

# Step 7: Calculate Parallel Jobs for MSSQL to BigQuery

This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run

In [None]:
# calculate parallel jobs:
COMPLETE_LIST = copy.deepcopy(SQL_SERVER_TABLE_LIST)
PARALLEL_JOBS = len(SQL_SERVER_TABLE_LIST)//MAX_PARALLELISM
JOB_LIST = []
while len(COMPLETE_LIST) > 0:
    SUB_LIST = []
    for i in range(MAX_PARALLELISM):
        if len(COMPLETE_LIST)>0 :
            SUB_LIST.append(COMPLETE_LIST[0])
            COMPLETE_LIST.pop(0)
        else:
            break
    JOB_LIST.append(SUB_LIST)
print("list of tables for execution : ")
print(JOB_LIST)

# Step 8: Get Row Count of Tables and identify Partition Columns

This step uses PARTITION_THRESHOLD (default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be partitioned based on Primary Keys
Get Primary keys for all tables to be migrated and find an integer column to partition on

In [None]:
PARTITION_THRESHOLD = 1000000
CHECK_PARTITION_COLUMN_LIST={}

In [None]:
with DB.connect() as conn:
    for table in SQL_SERVER_TABLE_LIST:
        results = conn.execute(text("SELECT count(1) FROM {}".format(table))).fetchall()
        # print(results)
        if results[0][0]>PARTITION_THRESHOLD and len(SQL_TABLE_PRIMARY_KEYS.get(table).split(",")[0])>0:
            column_list=SQL_TABLE_PRIMARY_KEYS.get(table).split(",")
            for column in column_list:
                results_datatype = conn.execute(text("SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{0}' AND TABLE_NAME   = '{1}' AND COLUMN_NAME  = '{2}'".format(table.split(".")[0],table.split(".")[1],column))).fetchall()
                # print(results_datatype)
                if results_datatype[0][0]=="int":
                    lowerbound = conn.execute(text("SELECT min({0}) from {1}".format(column,table))).fetchall()
                    upperbound = conn.execute(text("SELECT max({0}) from {1}".format(column,table))).fetchall()
                    numberPartitions = math.ceil((upperbound[0][0]-lowerbound[0][0])/PARTITION_THRESHOLD)
                    CHECK_PARTITION_COLUMN_LIST[table]=[column,lowerbound[0][0],upperbound[0][0],numberPartitions]
                    print(CHECK_PARTITION_COLUMN_LIST)


# Step 9: Execute Pipeline to Migrate tables from MSSQL to BIGQUERY

* BIGQUERY_DATASET : Target dataset in Bigquery
* BIGQUERY_MODE : Mode of operation at target <append|overwrite|ignore|errorifexists> (default overwrite)
* TEMP_GCS_BUCKET : Bucket name for dataproc job staging
* PYTHON_FILE_URIS : Path to PACKAGE_EGG_FILE
* MAIN_PYTHON_CLASS : Path to main.py

In [None]:
BIGQUERY_DATASET="<bq-dataset>"
BIGQUERY_MODE = "overwrite"  # append/overwrite
TEMP_GCS_BUCKET="<temp-bucket-name>"
PYTHON_FILE_URIS = [ GCS_STAGING_LOCATION + "/dataproc_templates_distribution.egg" ]
MAIN_PYTHON_CLASS = GCS_STAGING_LOCATION + "/main.py"

In [None]:
def migrate_mssql_to_bigquery(EXECUTION_LIST):
    EXECUTION_LIST = EXECUTION_LIST
    aiplatform.init(project=PROJECT,staging_bucket=TEMP_GCS_BUCKET)
    
    @dsl.pipeline(
        name="python-mssql-to-bigquery-pyspark",
        description="Pipeline to get data from Microsoft SQL Server to BigQuery",
    )
    def pipeline(
        PROJECT_ID: str = PROJECT,
        LOCATION: str = REGION,
        MAIN_PYTHON_CLASS: str = MAIN_PYTHON_CLASS,
        JAR_FILE_URIS: list = JARS,
        SUBNETWORK_URI: str = SUBNET,
        SERVICE_ACCOUNT: str = SERVICE_ACCOUNT,
        PYTHON_FILE_URIS: list = PYTHON_FILE_URIS
    ):
        for table in EXECUTION_LIST:
            BATCH_ID = "mssql2bigquery-{}".format(datetime.now().strftime("%s"))
            MSSQL_TO_BIGQUERY_JOBS.append(BATCH_ID)
            if table in CHECK_PARTITION_COLUMN_LIST.keys():
                TEMPLATE_SPARK_ARGS = [
                    "--template=JDBCTOBIGQUERY",
                    "--jdbc.bigquery.input.url={}".format(JDBC_URL),
                    "--jdbc.bigquery.input.driver={}".format(JDBC_DRIVER),
                    "--jdbc.bigquery.input.table={}".format(table),
                    "--jdbc.bigquery.output.mode={}".format(BIGQUERY_MODE),
                    "--jdbc.bigquery.output.table={}".format(table.split('.')[1]),
                    "--jdbc.bigquery.temp.bucket.name={}".format(TEMP_GCS_BUCKET),
                    "--jdbc.bigquery.output.dataset={}".format(BIGQUERY_DATASET),
                    "--jdbc.bigquery.input.partitioncolumn={}".format(CHECK_PARTITION_COLUMN_LIST[table][0]),
                    "--jdbc.bigquery.input.lowerbound={}".format(CHECK_PARTITION_COLUMN_LIST[table][1]),
                    "--jdbc.bigquery.input.upperbound={}".format(CHECK_PARTITION_COLUMN_LIST[table][2]),
                    "--jdbc.bigquery.numpartitions={}".format(CHECK_PARTITION_COLUMN_LIST[table][3])
                ]
            else:
                TEMPLATE_SPARK_ARGS = [
                    "--template=JDBCTOBIGQUERY",
                    "--jdbc.bigquery.input.url={}".format(JDBC_URL),
                    "--jdbc.bigquery.input.driver={}".format(JDBC_DRIVER),
                    "--jdbc.bigquery.input.table={}".format(table),
                    "--jdbc.bigquery.output.mode={}".format(BIGQUERY_MODE),
                    "--jdbc.bigquery.output.table={}".format(table.split('.')[1]),
                    "--jdbc.bigquery.temp.bucket.name={}".format(TEMP_GCS_BUCKET),
                    "--jdbc.bigquery.output.dataset={}".format(BIGQUERY_DATASET)
                ]

            _ = DataprocPySparkBatchOp(
                project=PROJECT_ID,
                location=LOCATION,
                batch_id=BATCH_ID,
                main_python_file_uri=MAIN_PYTHON_CLASS,
                jar_file_uris=JAR_FILE_URIS,
                python_file_uris=PYTHON_FILE_URIS,
                subnetwork_uri=SUBNETWORK_URI,
                service_account=SERVICE_ACCOUNT,
                runtime_config_version="1.1", # issue 665
                args=TEMPLATE_SPARK_ARGS
                )
            time.sleep(3)

    compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

    pipeline = aiplatform.PipelineJob(
            display_name="pipeline",
        template_path="pipeline.json",
        pipeline_root=PIPELINE_ROOT,
        enable_caching=False,
        location=REGION,
        )
    pipeline.run(service_account=SERVICE_ACCOUNT)

In [None]:
for execution_list in JOB_LIST:
    print(execution_list)
    migrate_mssql_to_bigquery(execution_list)

# Step 10: Get status for tables migrated from SQL Server to BIGQUERY

In [None]:
def get_bearer_token():
    
    try:
        #Defining Scope
        CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]

        #Assigning credentials and project value
        credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)

        #Refreshing credentials data
        credentials.refresh(requests.Request())

        #Get refreshed token
        token = credentials.token
        if token:
            return (token,200)
        else:
            return "Bearer token not generated"
    except Exception as error:
        return ("Bearer token not generated. Error : {}".format(error),500)

In [None]:
from google.auth.transport import requests
import google

token = get_bearer_token()
token = get_bearer_token()
if token[1] == 200:
    print("Bearer token generated")
else:
    print(token)

In [None]:
import requests

mssql_to_bigquery_status = []
job_status_url = "https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}"
for job in MSSQL_TO_BIGQUERY_JOBS:
    auth = "Bearer " + token[0]
    url = job_status_url.format(PROJECT,REGION,job)
    headers = {
      'Content-Type': 'application/json; charset=UTF-8',
      'Authorization': auth 
    }
    response = requests.get(url, headers=headers)
    mssql_to_bigquery_status.append(response.json()['state'])

In [None]:
statusDF = pd.DataFrame({"table" : SQL_SERVER_TABLE_LIST,"mssql_to_bigquery_job" : MSSQL_TO_BIGQUERY_JOBS, "mssql_to_bigquery_status" : mssql_to_bigquery_status})
statusDF

# Step 11: Validate row counts of migrated tables from SQL Server to BigQuery

In [None]:
mssql_row_count = []
bq_row_count = []

In [None]:
# get mssql table counts
DB = sqlalchemy.create_engine(
            sqlalchemy.engine.url.URL.create(
                drivername=SQL_SERVER_DRIVER,
                username=SQL_SERVER_USERNAME,
                password=SQL_SERVER_PASSWORD,
                database=SQL_SERVER_DATABASE,
                host=SQL_SERVER_HOST,
                port=SQL_SERVER_PORT
              )
            )
with DB.connect() as conn:
    for table in SQL_SERVER_TABLE_LIST:
        results = conn.execute(text("select count(*) from {}".format(table))).fetchall()
        for row in results:
            mssql_row_count.append(row[0])

In [None]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

for table in SQL_SERVER_TABLE_LIST:
    results = client.query("SELECT row_count FROM {}.__TABLES__ where table_id = '{}'".format(BIGQUERY_DATASET,table.split('.')[1]))
    for row in results:
        bq_row_count.append(row[0])

In [None]:
statusDF['mssql_row_count'] = mssql_row_count 
statusDF['bq_row_count'] = bq_row_count 
statusDF