notebooks/mssql2bq/mssql-to-bigquery-notebook.ipynb (911 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "f9a9e6d2-f721-48cc-a6a7-01787aa9362c", "metadata": {}, "outputs": [], "source": [ "# Copyright 2022 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "id": "e5876b8a-4fc8-43d5-93f6-fbd19fb2c433", "metadata": {}, "source": [ "# MSSQL to BigQuery Migration\n", "<table align=\"left\">\n", "<td>\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mssql2bq/mssql-to-bigquery-notebook.ipynb\">\n", " <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fmssql2bq%2Fmssql-to-bigquery-notebook.ipynb\">\n", " <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mssql2bq/mssql-to-bigquery-notebook.ipynb\">\n", " <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/mssql2bq/mssql-to-bigquery-notebook.ipynb\">\n", " <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n", " </a>\n", "</td>\n", "</table>" ] }, { "cell_type": "markdown", "id": "79b8102d-df42-448a-9df4-35ae6b8fae07", "metadata": {}, "source": [ "## References\n", "* [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html) \n", "* [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n", "* [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n", "* [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n", "\n", "This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account.\n", "\n", "## Permissions\n", "Make sure that the service account used to run the notebook has the following roles:\n", "\n", "* roles/aiplatform.serviceAgent\n", "* roles/aiplatform.customCodeServiceAgent\n", "* roles/storage.objectCreator\n", "* roles/storage.objectViewer\n", "* roles/dataproc.editor\n", "* roles/dataproc.worker" ] }, { "cell_type": "markdown", "id": "5e06a8de-c8b0-4ada-81c7-54606c0afdb9", "metadata": {}, "source": [ "# Step 1: Install Libraries\n", "<div class=\"alert alert-block alert-info\">\n", "<b>NOTE: </b>Run Step 1 one time for each new notebook instance</div" ] }, { "cell_type": "code", "execution_count": null, "id": "258dae17-19f2-410a-8cc9-f5980ccc30f4", "metadata": {}, "outputs": [], "source": [ "!pip3 install pymssql SQLAlchemy\n", "!pip3 install --upgrade google-cloud-pipeline-components kfp -q\n", "!pip3 install google.cloud.aiplatform" ] }, { "cell_type": "markdown", "id": "d5a7126d-fc9c-4f04-8031-cbd835f891c6", "metadata": {}, "source": [ "### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.\n", "\n", "Uncomment & Run this cell if you have installed anything from above commands" ] }, { "cell_type": "code", "execution_count": null, "id": "74245439-10f3-4631-b8cd-4c077318b737", "metadata": {}, "outputs": [], "source": [ "# import os\n", "# import IPython\n", "# if not os.getenv(\"IS_TESTING\"):\n", "# app = IPython.Application.instance()\n", "# app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "id": "50fe84b1-eb32-4cc2-a4e2-f0bd98da5b90", "metadata": {}, "source": [ "# Step 2: Import Libraries" ] }, { "cell_type": "code", "execution_count": null, "id": "9f225135-cfe5-4ddd-9659-ead437aa414c", "metadata": { "pycharm": { "is_executing": true } }, "outputs": [], "source": [ "import google.cloud.aiplatform as aiplatform\n", "import sys, os\n", "from kfp import dsl\n", "from kfp import compiler\n", "from datetime import datetime\n", "import time\n", "import copy\n", "import json\n", "import pandas as pd\n", "\n", "try:\n", " from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp\n", "except ModuleNotFoundError:\n", " from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp\n", " \n", "import sqlalchemy\n", "from sqlalchemy import text\n", "import pymssql\n", "import math\n", "from pathlib import Path" ] }, { "attachments": {}, "cell_type": "markdown", "id": "af0cf264-6eef-441d-a0a4-0c7bdc3d5b96", "metadata": {}, "source": [ "# Step 3: Assign Parameters\n", "\n", "## Step 3.1 Common Parameters\n", "\n", "PROJECT : GCP project-id\n", "\n", "REGION : GCP region\n", "\n", "GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts\n", "\n", "SUBNET : VPC subnet\n", "\n", "JARS : list of jars. For this notebook mssql connectora and postgres connectorjar is required in addition with the dataproc template\n", "\n", "MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 5\n", "\n", "SERVICE_ACCOUNT: Custom service account email to use for vertex ai pipeline and dataproc job with above mentioned permissions\n", "\n", "MSSQL_TO_BIGQUERY_JOBS : List of bigquery job IDs that will be created by Vertex AI pipelines to migrate data from source to BQ." ] }, { "cell_type": "code", "execution_count": null, "id": "ee427d7c-edcb-46b2-a825-624db784aff0", "metadata": {}, "outputs": [], "source": [ "PROJECT = \"<project-id>\"\n", "REGION = \"<region>\"\n", "GCS_STAGING_LOCATION = \"gs://<staging-bucket>\"\n", "SUBNET = \"projects/<project-id>/regions/<region>/subnetworks/<subnet-name>\"\n", "MAX_PARALLELISM = 5 # default value is set to 5\n", "SERVICE_ACCOUNT = \"\"\n", "MSSQL_TO_BIGQUERY_JOBS = []\n", "\n", "# Do not change this parameter unless you want to refer below JARS from new location\n", "JARS = [GCS_STAGING_LOCATION + \"/jars/mssql-jdbc-6.4.0.jre8.jar\", GCS_STAGING_LOCATION + \"/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar\"]\n", "\n", "# If SERVICE_ACCOUNT is not specified it will take the one attached to Notebook\n", "if SERVICE_ACCOUNT == '':\n", " shell_output = !gcloud auth list 2>/dev/null\n", " SERVICE_ACCOUNT = shell_output[2].replace(\"*\", \"\").strip()\n", " print(\"Service Account: \",SERVICE_ACCOUNT)\n", " \n" ] }, { "cell_type": "markdown", "id": "f129a976-154d-4627-aa08-6430eac9b3fc", "metadata": {}, "source": [ "## Step 3.2 MSSQL Parameters\n", "\n", "SQL_SERVER_HOST : MSSQL instance ip address\n", "\n", "SQL_SERVER_PORT : MSSQL instance port\n", "\n", "SQL_SERVER_USERNAME : MSSQL username\n", "\n", "SQL_SERVER_PASSWORD : MSSQL password\n", "\n", "SQL_SERVER_DATABASE : name of database that you want to migrate\n", "\n", "SQL_SERVER_TABLE_LIST : list of tables you want to migrate eg ['schema.table1','schema.table2'] else provide an empty list for migration of specific schemas or the whole database eg : []\n", "\n", "SQL_SERVER_SCHEMA_LIST : List of schemas. Use this if you'ld like to migrate all tables associated with specific schemas eg. ['schema1','schema2']. If otherwise, leave this parameter empty eg []. \n", "\n", "<div class=\"alert alert-block alert-warning\">\n", "<b>NOTE: </b>Please ensure that SQL_SERVER_SCHEMA_LIST and SQL_SERVER_TABLE_LIST are not used simultaneously. Use only one of the two prameters at a time or leave both of these empty to migrate the entire database.</div>" ] }, { "cell_type": "code", "execution_count": null, "id": "e1aa479e-8318-4c5d-a36b-fdb497a6053a", "metadata": {}, "outputs": [], "source": [ "SQL_SERVER_HOST = \"<host-ip-address>\"\n", "SQL_SERVER_PORT = \"1433\"\n", "SQL_SERVER_USERNAME = \"<user-name>\"\n", "SQL_SERVER_PASSWORD = \"<password>\"\n", "SQL_SERVER_DATABASE = \"<database-name>\"\n", "SQL_SERVER_TABLE_LIST=[] # leave list empty for migrating complete database else provide tables as ['dbo.table1','sys.table2']. If this parameter is not empty, leave SQL_SERVER_SCHEMA_LIST empty.\n", "SQL_SERVER_SCHEMA_LIST=[] # leave list empty for migrating complete database else provide schema as ['schema1','schema2'] for migrating all tables in specific schemas. If this parameter is not empty, leave SQL_SERVER_TABLE_LIST empty." ] }, { "cell_type": "markdown", "id": "73cd7d7f-b70e-4f09-8b7d-c96038cc892c", "metadata": {}, "source": [ "# Step 3.3 Notebook Configuration Parameters\n", "\n", "<div class=\"alert alert-block alert-warning\">\n", "<b>NOTE: </b>Below variables should not be changed unless required</div>\n", "\n", "SQL_SERVER_DRIVER : MSSQL Driver\n", "\n", "JDBC_DRIVER : JDBC driver class\n", "\n", "JDBC_URL : MSSQL jdbc url\n", "\n", "MAIN_CLASS : Dataproc Template Main Class\n", "\n", "WORKING_DIRECTORY : Python working directory\n", "\n", "PACKAGE_EGG_FILE : Dataproc Template distributio file\n", "\n", "PIPELINE_ROOT : Path to Vertex AI pipeline artifacts" ] }, { "cell_type": "code", "execution_count": null, "id": "9f17741a-37e6-4657-bb3b-11b584270015", "metadata": {}, "outputs": [], "source": [ "SQL_SERVER_DRIVER = \"mssql+pymssql\"\n", "JDBC_DRIVER = \"com.microsoft.sqlserver.jdbc.SQLServerDriver\"\n", "JDBC_URL = \"jdbc:sqlserver://{}:{};databaseName={};user={};password={}\".format(SQL_SERVER_HOST,SQL_SERVER_PORT,SQL_SERVER_DATABASE,SQL_SERVER_USERNAME,SQL_SERVER_PASSWORD)\n", "MAIN_CLASS = \"com.google.cloud.dataproc.templates.main.DataProcTemplate\"\n", "PACKAGE_EGG_FILE = \"dataproc_templates_distribution.egg\"\n", "PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n" ] }, { "cell_type": "code", "execution_count": null, "id": "6960dba0", "metadata": {}, "outputs": [], "source": [ "cur_path = Path(os.getcwd())\n", "WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')\n", "\n", "# If the above code doesn't fetches the correct path please\n", "# provide complete path to python folder in your dataproc \n", "# template repo which you cloned \n", "\n", "# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/python/\"\n", "print(WORKING_DIRECTORY)" ] }, { "cell_type": "markdown", "id": "d9323378-426e-4593-8b22-e6b20b64bfe9", "metadata": {}, "source": [ "# Step 4: Generate SQL SERVER Table List\n", "\n", "This step creates list of tables for migration. \n", "\n", "* If SQL_SERVER_TABLE_LIST and SQL_SERVER_SCHEMA_LIST are kept empty, then all the tables in the SQL_SERVER_DATABASE are listed for migration.\n", "\n", "* If SQL_SERVER_SCHEMA_LIST is non empty, then all tables associated with the mentioned schemas will be listed for migration\n", "\n", "* If SQL_SERVER_TABLE_LIST is non empty, then the provided list of tables are selected for migration" ] }, { "cell_type": "code", "execution_count": null, "id": "35d75bb9-8d10-4f0e-989b-8b81727ada7f", "metadata": {}, "outputs": [], "source": [ "if SQL_SERVER_SCHEMA_LIST and SQL_SERVER_TABLE_LIST:\n", " sys.exit(\"Please provide values for either SQL_SERVER_SCHEMA_LIST OR SQL_SERVER_TABLE_LIST. Non empty values for both the values at the same time are not accepted\")" ] }, { "cell_type": "code", "execution_count": null, "id": "3f433106-453d-445f-bcda-8aad4efc0d95", "metadata": {}, "outputs": [], "source": [ "DB = sqlalchemy.create_engine(\n", " sqlalchemy.engine.url.URL.create(\n", " drivername=SQL_SERVER_DRIVER,\n", " username=SQL_SERVER_USERNAME,\n", " password=SQL_SERVER_PASSWORD,\n", " database=SQL_SERVER_DATABASE,\n", " host=SQL_SERVER_HOST,\n", " port=SQL_SERVER_PORT\n", " )\n", " )\n", "\n", "with DB.connect() as conn:\n", " print(\"connected to database\")\n", " if not SQL_SERVER_TABLE_LIST and not SQL_SERVER_SCHEMA_LIST: # Migrate all possible tables from database\n", " results = conn.execute(text('select TABLE_SCHEMA,TABLE_NAME from INFORMATION_SCHEMA.Tables')).fetchall()\n", " \n", " elif SQL_SERVER_SCHEMA_LIST and not SQL_SERVER_TABLE_LIST: # Only Migrate tables associated with the provided schema list\n", " results = conn.execute(text(\"select TABLE_SCHEMA,TABLE_NAME from INFORMATION_SCHEMA.Tables where TABLE_SCHEMA in ('{}');\".format(\"','\".join(SQL_SERVER_SCHEMA_LIST)))).fetchall()\n", " \n", " # when SQL_SERVER_TABLE_LIST is already not empty, only mentioned tables will be migrated\n", " \n", " print(\"Total Tables = \", len(results))\n", " for row in results:\n", " SQL_SERVER_TABLE_LIST.append(row[0]+\".\"+row[1])\n", " \n", " print(\"list of tables for migration :\")\n", " print(SQL_SERVER_TABLE_LIST)\n", " " ] }, { "cell_type": "markdown", "id": "7359b268-ceec-4694-91ec-63f8469040dc", "metadata": {}, "source": [ "# Step 5: Get Primary Keys for partitioning the tables\n", "\n", "This step fetches primary key from SQL_SERVER_DATABASE for the tables listed for migration" ] }, { "cell_type": "code", "execution_count": null, "id": "8346e717-5bd3-471f-938e-b64fc1721b3f", "metadata": {}, "outputs": [], "source": [ "with DB.connect() as conn:\n", " SQL_TABLE_PRIMARY_KEYS = {}\n", " for table in SQL_SERVER_TABLE_LIST:\n", " primary_keys = []\n", " results = conn.execute(text(\"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS T JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE K ON K.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE K.TABLE_NAME='{0}' AND K.TABLE_SCHEMA='{1}' AND T.CONSTRAINT_TYPE='PRIMARY KEY';\".format(table.split(\".\")[1],table.split(\".\")[0]))).fetchall()\n", " # print(results)\n", " for row in results:\n", " primary_keys.append(row[0])\n", " if primary_keys:\n", " SQL_TABLE_PRIMARY_KEYS[table] = \",\".join(primary_keys)\n", " else:\n", " SQL_TABLE_PRIMARY_KEYS[table] = \"\"\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "51baf6f6-2c68-4c63-adfd-f474aea6c48e", "metadata": {}, "outputs": [], "source": [ "pkDF = pd.DataFrame({\"table\" : SQL_SERVER_TABLE_LIST, \"primary_keys\": list(SQL_TABLE_PRIMARY_KEYS.values())})\n", "print(\"Below are identified primary keys for migrating mssql table to bigquery:\")\n", "pkDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ba873f01-cda3-4954-aac6-6ddf7df54642", "metadata": {}, "source": [ "# Step 6: Create JAR files and Upload to Cloud Storage\n", "<div class=\"alert alert-block alert-info\">\n", "<b>NOTE: </b> Run Step 6 one time for each new notebook instance</div>" ] }, { "cell_type": "code", "execution_count": null, "id": "7603c8f5-0dbd-4fac-abcd-e6df133133f6", "metadata": {}, "outputs": [], "source": [ "%cd $WORKING_DIRECTORY" ] }, { "cell_type": "markdown", "id": "4d6bed57-868d-4f87-8311-33359004d0a0", "metadata": {}, "source": [ "### Get JDBC Connector jars" ] }, { "cell_type": "code", "execution_count": null, "id": "b95e378d-1da3-4c61-a671-f1d8b9c53d65", "metadata": {}, "outputs": [], "source": [ "%%bash\n", "\n", "wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/6.4.0.jre8/mssql-jdbc-6.4.0.jre8.jar\n", "\n", "wget https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.27.0/spark-bigquery-with-dependencies_2.12-0.27.0.jar" ] }, { "cell_type": "markdown", "id": "7ca2ab48-638d-41bb-a29a-4361fcc0a138", "metadata": {}, "source": [ "### Build Dataproc Templates python package" ] }, { "cell_type": "code", "execution_count": null, "id": "2965a564-9a68-4092-82bc-904661ba01e4", "metadata": {}, "outputs": [], "source": [ "! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE" ] }, { "cell_type": "markdown", "id": "fab7c84b-a310-4c77-ae96-0a444a989e67", "metadata": {}, "source": [ "### Copying JAR files to GCS_STAGING_LOCATION" ] }, { "cell_type": "code", "execution_count": null, "id": "6c70baf3-2b11-4f84-b948-1ea70fb932ff", "metadata": {}, "outputs": [], "source": [ "! gsutil cp main.py $GCS_STAGING_LOCATION/\n", "! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/\n", "! gsutil cp mssql-jdbc-6.4.0.jre8.jar $GCS_STAGING_LOCATION/jars/mssql-jdbc-6.4.0.jre8.jar\n", "! gsutil cp spark-bigquery-with-dependencies_2.12-0.27.0.jar $GCS_STAGING_LOCATION/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar" ] }, { "cell_type": "markdown", "id": "53007b43-28c7-41db-8698-e70f4a84b8ea", "metadata": {}, "source": [ "# Step 7: Calculate Parallel Jobs for MSSQL to BigQuery\n", "\n", "This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run" ] }, { "cell_type": "code", "execution_count": null, "id": "365e3ca3-4381-4191-b8ee-43659ba85940", "metadata": {}, "outputs": [], "source": [ "# calculate parallel jobs:\n", "COMPLETE_LIST = copy.deepcopy(SQL_SERVER_TABLE_LIST)\n", "PARALLEL_JOBS = len(SQL_SERVER_TABLE_LIST)//MAX_PARALLELISM\n", "JOB_LIST = []\n", "while len(COMPLETE_LIST) > 0:\n", " SUB_LIST = []\n", " for i in range(MAX_PARALLELISM):\n", " if len(COMPLETE_LIST)>0 :\n", " SUB_LIST.append(COMPLETE_LIST[0])\n", " COMPLETE_LIST.pop(0)\n", " else:\n", " break\n", " JOB_LIST.append(SUB_LIST)\n", "print(\"list of tables for execution : \")\n", "print(JOB_LIST)" ] }, { "cell_type": "markdown", "id": "57e663cc-a269-4beb-87e1-740f772e9eb4", "metadata": {}, "source": [ "# Step 8: Get Row Count of Tables and identify Partition Columns\n", "\n", "This step uses PARTITION_THRESHOLD (default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be partitioned based on Primary Keys\n", "Get Primary keys for all tables to be migrated and find an integer column to partition on" ] }, { "cell_type": "code", "execution_count": null, "id": "636b871e-c975-4a0b-bc51-7069ce77e51e", "metadata": {}, "outputs": [], "source": [ "PARTITION_THRESHOLD = 1000000\n", "CHECK_PARTITION_COLUMN_LIST={}" ] }, { "cell_type": "code", "execution_count": null, "id": "f2055755-60ad-4371-a936-bfbfa8aaedb8", "metadata": {}, "outputs": [], "source": [ "with DB.connect() as conn:\n", " for table in SQL_SERVER_TABLE_LIST:\n", " results = conn.execute(text(\"SELECT count(1) FROM {}\".format(table))).fetchall()\n", " # print(results)\n", " if results[0][0]>PARTITION_THRESHOLD and len(SQL_TABLE_PRIMARY_KEYS.get(table).split(\",\")[0])>0:\n", " column_list=SQL_TABLE_PRIMARY_KEYS.get(table).split(\",\")\n", " for column in column_list:\n", " results_datatype = conn.execute(text(\"SELECT DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{0}' AND TABLE_NAME = '{1}' AND COLUMN_NAME = '{2}'\".format(table.split(\".\")[0],table.split(\".\")[1],column))).fetchall()\n", " # print(results_datatype)\n", " if results_datatype[0][0]==\"int\":\n", " lowerbound = conn.execute(text(\"SELECT min({0}) from {1}\".format(column,table))).fetchall()\n", " upperbound = conn.execute(text(\"SELECT max({0}) from {1}\".format(column,table))).fetchall()\n", " numberPartitions = math.ceil((upperbound[0][0]-lowerbound[0][0])/PARTITION_THRESHOLD)\n", " CHECK_PARTITION_COLUMN_LIST[table]=[column,lowerbound[0][0],upperbound[0][0],numberPartitions]\n", " print(CHECK_PARTITION_COLUMN_LIST)\n" ] }, { "cell_type": "markdown", "id": "ff7d7523-4f87-48e7-bbe5-d1206b817c3d", "metadata": {}, "source": [ "# Step 9: Execute Pipeline to Migrate tables from MSSQL to BIGQUERY\n", "\n", "* BIGQUERY_DATASET : Target dataset in Bigquery\n", "* BIGQUERY_MODE : Mode of operation at target <append|overwrite|ignore|errorifexists> (default overwrite)\n", "* TEMP_GCS_BUCKET : Bucket name for dataproc job staging\n", "* PYTHON_FILE_URIS : Path to PACKAGE_EGG_FILE\n", "* MAIN_PYTHON_CLASS : Path to main.py" ] }, { "cell_type": "code", "execution_count": null, "id": "46e607f2-c6c0-4abe-b4ff-ce34c83fcbf2", "metadata": {}, "outputs": [], "source": [ "BIGQUERY_DATASET=\"<bq-dataset>\"\n", "BIGQUERY_MODE = \"overwrite\" # append/overwrite\n", "TEMP_GCS_BUCKET=\"<temp-bucket-name>\"\n", "PYTHON_FILE_URIS = [ GCS_STAGING_LOCATION + \"/dataproc_templates_distribution.egg\" ]\n", "MAIN_PYTHON_CLASS = GCS_STAGING_LOCATION + \"/main.py\"" ] }, { "cell_type": "code", "execution_count": null, "id": "5f287fa2-8136-4c2b-a4ff-12cce31aa0dd", "metadata": {}, "outputs": [], "source": [ "def migrate_mssql_to_bigquery(EXECUTION_LIST):\n", " EXECUTION_LIST = EXECUTION_LIST\n", " aiplatform.init(project=PROJECT,staging_bucket=TEMP_GCS_BUCKET)\n", " \n", " @dsl.pipeline(\n", " name=\"python-mssql-to-bigquery-pyspark\",\n", " description=\"Pipeline to get data from Microsoft SQL Server to BigQuery\",\n", " )\n", " def pipeline(\n", " PROJECT_ID: str = PROJECT,\n", " LOCATION: str = REGION,\n", " MAIN_PYTHON_CLASS: str = MAIN_PYTHON_CLASS,\n", " JAR_FILE_URIS: list = JARS,\n", " SUBNETWORK_URI: str = SUBNET,\n", " SERVICE_ACCOUNT: str = SERVICE_ACCOUNT,\n", " PYTHON_FILE_URIS: list = PYTHON_FILE_URIS\n", " ):\n", " for table in EXECUTION_LIST:\n", " BATCH_ID = \"mssql2bigquery-{}\".format(datetime.now().strftime(\"%s\"))\n", " MSSQL_TO_BIGQUERY_JOBS.append(BATCH_ID)\n", " if table in CHECK_PARTITION_COLUMN_LIST.keys():\n", " TEMPLATE_SPARK_ARGS = [\n", " \"--template=JDBCTOBIGQUERY\",\n", " \"--jdbc.bigquery.input.url={}\".format(JDBC_URL),\n", " \"--jdbc.bigquery.input.driver={}\".format(JDBC_DRIVER),\n", " \"--jdbc.bigquery.input.table={}\".format(table),\n", " \"--jdbc.bigquery.output.mode={}\".format(BIGQUERY_MODE),\n", " \"--jdbc.bigquery.output.table={}\".format(table.split('.')[1]),\n", " \"--jdbc.bigquery.temp.bucket.name={}\".format(TEMP_GCS_BUCKET),\n", " \"--jdbc.bigquery.output.dataset={}\".format(BIGQUERY_DATASET),\n", " \"--jdbc.bigquery.input.partitioncolumn={}\".format(CHECK_PARTITION_COLUMN_LIST[table][0]),\n", " \"--jdbc.bigquery.input.lowerbound={}\".format(CHECK_PARTITION_COLUMN_LIST[table][1]),\n", " \"--jdbc.bigquery.input.upperbound={}\".format(CHECK_PARTITION_COLUMN_LIST[table][2]),\n", " \"--jdbc.bigquery.numpartitions={}\".format(CHECK_PARTITION_COLUMN_LIST[table][3])\n", " ]\n", " else:\n", " TEMPLATE_SPARK_ARGS = [\n", " \"--template=JDBCTOBIGQUERY\",\n", " \"--jdbc.bigquery.input.url={}\".format(JDBC_URL),\n", " \"--jdbc.bigquery.input.driver={}\".format(JDBC_DRIVER),\n", " \"--jdbc.bigquery.input.table={}\".format(table),\n", " \"--jdbc.bigquery.output.mode={}\".format(BIGQUERY_MODE),\n", " \"--jdbc.bigquery.output.table={}\".format(table.split('.')[1]),\n", " \"--jdbc.bigquery.temp.bucket.name={}\".format(TEMP_GCS_BUCKET),\n", " \"--jdbc.bigquery.output.dataset={}\".format(BIGQUERY_DATASET)\n", " ]\n", "\n", " _ = DataprocPySparkBatchOp(\n", " project=PROJECT_ID,\n", " location=LOCATION,\n", " batch_id=BATCH_ID,\n", " main_python_file_uri=MAIN_PYTHON_CLASS,\n", " jar_file_uris=JAR_FILE_URIS,\n", " python_file_uris=PYTHON_FILE_URIS,\n", " subnetwork_uri=SUBNETWORK_URI,\n", " service_account=SERVICE_ACCOUNT,\n", " runtime_config_version=\"1.1\", # issue 665\n", " args=TEMPLATE_SPARK_ARGS\n", " )\n", " time.sleep(3)\n", "\n", " compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n", "\n", " pipeline = aiplatform.PipelineJob(\n", " display_name=\"pipeline\",\n", " template_path=\"pipeline.json\",\n", " pipeline_root=PIPELINE_ROOT,\n", " enable_caching=False,\n", " location=REGION,\n", " )\n", " pipeline.run(service_account=SERVICE_ACCOUNT)" ] }, { "cell_type": "code", "execution_count": null, "id": "1b1511c7-b899-4862-9e27-11071167febf", "metadata": {}, "outputs": [], "source": [ "for execution_list in JOB_LIST:\n", " print(execution_list)\n", " migrate_mssql_to_bigquery(execution_list)" ] }, { "cell_type": "markdown", "id": "23abfd4b-3658-4108-a28a-43d9adc28fe7", "metadata": {}, "source": [ "# Step 10: Get status for tables migrated from SQL Server to BIGQUERY" ] }, { "cell_type": "code", "execution_count": null, "id": "35b11938-7875-4c15-acf4-111f7b15b3f2", "metadata": {}, "outputs": [], "source": [ "def get_bearer_token():\n", " \n", " try:\n", " #Defining Scope\n", " CREDENTIAL_SCOPES = [\"https://www.googleapis.com/auth/cloud-platform\"]\n", "\n", " #Assigning credentials and project value\n", " credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)\n", "\n", " #Refreshing credentials data\n", " credentials.refresh(requests.Request())\n", "\n", " #Get refreshed token\n", " token = credentials.token\n", " if token:\n", " return (token,200)\n", " else:\n", " return \"Bearer token not generated\"\n", " except Exception as error:\n", " return (\"Bearer token not generated. Error : {}\".format(error),500)" ] }, { "cell_type": "code", "execution_count": null, "id": "34feb439-3081-4719-90cd-320994b1422b", "metadata": {}, "outputs": [], "source": [ "from google.auth.transport import requests\n", "import google\n", "\n", "token = get_bearer_token()\n", "token = get_bearer_token()\n", "if token[1] == 200:\n", " print(\"Bearer token generated\")\n", "else:\n", " print(token)" ] }, { "cell_type": "code", "execution_count": null, "id": "f1ffebab-b7fc-4456-a9f6-487a7b9fb18b", "metadata": {}, "outputs": [], "source": [ "import requests\n", "\n", "mssql_to_bigquery_status = []\n", "job_status_url = \"https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}\"\n", "for job in MSSQL_TO_BIGQUERY_JOBS:\n", " auth = \"Bearer \" + token[0]\n", " url = job_status_url.format(PROJECT,REGION,job)\n", " headers = {\n", " 'Content-Type': 'application/json; charset=UTF-8',\n", " 'Authorization': auth \n", " }\n", " response = requests.get(url, headers=headers)\n", " mssql_to_bigquery_status.append(response.json()['state'])" ] }, { "cell_type": "code", "execution_count": null, "id": "6f63b7e2-ef71-4dda-a7bd-d421390f00ea", "metadata": {}, "outputs": [], "source": [ "statusDF = pd.DataFrame({\"table\" : SQL_SERVER_TABLE_LIST,\"mssql_to_bigquery_job\" : MSSQL_TO_BIGQUERY_JOBS, \"mssql_to_bigquery_status\" : mssql_to_bigquery_status})\n", "statusDF" ] }, { "cell_type": "markdown", "id": "bd34e553-e49c-4848-8a46-df565cb5267f", "metadata": {}, "source": [ "# Step 11: Validate row counts of migrated tables from SQL Server to BigQuery" ] }, { "cell_type": "code", "execution_count": null, "id": "98ec2df4-a190-4e41-9c3e-d1f59cfb76e5", "metadata": {}, "outputs": [], "source": [ "mssql_row_count = []\n", "bq_row_count = []" ] }, { "cell_type": "code", "execution_count": null, "id": "617c5fa1-f3a2-4322-b3e9-d71e34e9e4de", "metadata": {}, "outputs": [], "source": [ "# get mssql table counts\n", "DB = sqlalchemy.create_engine(\n", " sqlalchemy.engine.url.URL.create(\n", " drivername=SQL_SERVER_DRIVER,\n", " username=SQL_SERVER_USERNAME,\n", " password=SQL_SERVER_PASSWORD,\n", " database=SQL_SERVER_DATABASE,\n", " host=SQL_SERVER_HOST,\n", " port=SQL_SERVER_PORT\n", " )\n", " )\n", "with DB.connect() as conn:\n", " for table in SQL_SERVER_TABLE_LIST:\n", " results = conn.execute(text(\"select count(*) from {}\".format(table))).fetchall()\n", " for row in results:\n", " mssql_row_count.append(row[0])" ] }, { "cell_type": "code", "execution_count": null, "id": "3d1f4420-cd44-4e77-9df7-11bff3733eed", "metadata": {}, "outputs": [], "source": [ "from google.cloud import bigquery\n", "\n", "# Construct a BigQuery client object.\n", "client = bigquery.Client()\n", "\n", "for table in SQL_SERVER_TABLE_LIST:\n", " results = client.query(\"SELECT row_count FROM {}.__TABLES__ where table_id = '{}'\".format(BIGQUERY_DATASET,table.split('.')[1]))\n", " for row in results:\n", " bq_row_count.append(row[0])" ] }, { "cell_type": "code", "execution_count": null, "id": "412c823f-f574-47d3-b4c6-094858763926", "metadata": {}, "outputs": [], "source": [ "statusDF['mssql_row_count'] = mssql_row_count \n", "statusDF['bq_row_count'] = bq_row_count \n", "statusDF" ] }, { "cell_type": "code", "execution_count": null, "id": "566836ff-5cbb-4fcf-8fca-a7f00ef97139", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "environment": { "kernel": "python3", "name": "common-cpu.m107", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/base-cpu:m107" }, "kernelspec": { "display_name": "Python 3 (ipykernel) (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.9" }, "vscode": { "interpreter": { "hash": "b0fa6594d8f4cbf19f97940f81e996739fb7646882a419484c72d19e05852a7e" } } }, "nbformat": 4, "nbformat_minor": 5 }