notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb (903 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "98acd907", "metadata": {}, "outputs": [], "source": [ "# Copyright 2022 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "attachments": {}, "cell_type": "markdown", "id": "1db5371a-8f16-47b7-bcc7-5af386e9b6d8", "metadata": {}, "source": [ "# <center>MySQL to Cloud Spanner Migration (or Bulk Load)\n", "<table align=\"left\">\n", "<td>\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb\">\n", " <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fmysql2spanner%2FMySqlToSpanner_notebook.ipynb\">\n", " <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb\">\n", " <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb\">\n", " <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n", " </a>\n", "</td>\n", "</table>" ] }, { "attachments": {}, "cell_type": "markdown", "id": "dd944742", "metadata": {}, "source": [ "#### References\n", "\n", "- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)\n", "- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n", "- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n", "- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n", "\n", "This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. \n", "Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. \n", "\n", "#### Permissions\n", "\n", "Make sure that the service account used to run the notebook has the following roles:\n", "\n", "- roles/aiplatform.serviceAgent\n", "- roles/aiplatform.customCodeServiceAgent\n", "- roles/storage.objectCreator\n", "- roles/storage.objectViewer\n", "- roles/dataproc.editor\n", "- roles/dataproc.worker" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7d89b301-5249-462a-97d8-986488b303fd", "metadata": {}, "source": [ "## Step 1: Install Libraries\n", "#### Run Step 1 one time for each new notebook instance" ] }, { "cell_type": "code", "execution_count": null, "id": "fef65ec2-ad6b-407f-a993-7cdf871bba11", "metadata": { "tags": [] }, "outputs": [], "source": [ "!pip3 install pymysql SQLAlchemy\n", "!pip3 install --upgrade google-cloud-pipeline-components kfp --user -q\n", "!pip3 install google-cloud-spanner\n", "!pip3 install --upgrade google-cloud-storage" ] }, { "cell_type": "code", "execution_count": null, "id": "0e90943f-b965-4f7f-b631-ce62227d5e83", "metadata": { "tags": [] }, "outputs": [], "source": [ "!sudo apt-get update -y\n", "!sudo apt-get install default-jdk -y\n", "!sudo apt-get install maven -y" ] }, { "attachments": {}, "cell_type": "markdown", "id": "35712473-92ef-433b-9ce6-b5649357c09e", "metadata": {}, "source": [ "#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.\n", "\n", "Uncomment & Run this cell if you have installed anything from above commands" ] }, { "cell_type": "code", "execution_count": null, "id": "19ef1307-902e-4713-8948-b86084e19312", "metadata": {}, "outputs": [], "source": [ "# import os\n", "# import IPython\n", "# if not os.getenv(\"IS_TESTING\"):\n", "# app = IPython.Application.instance()\n", "# app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "id": "b5f21db0", "metadata": {}, "source": [ "Uncomment & Run this cell if you are running from Colab" ] }, { "cell_type": "code", "execution_count": null, "id": "ba566c42", "metadata": {}, "outputs": [], "source": [ "# !git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git\n", "# !mv /content/dataproc-templates/notebooks/util /content/\n", "# !mv /content/dataproc-templates/java/ /content/" ] }, { "attachments": {}, "cell_type": "markdown", "id": "70d01e33-9099-4d2e-b57e-575c3a998d84", "metadata": {}, "source": [ "## Step 2: Import Libraries" ] }, { "cell_type": "code", "execution_count": null, "id": "2703b502-1b41-44f1-bf21-41069255bc32", "metadata": { "tags": [] }, "outputs": [], "source": [ "from datetime import datetime\n", "import os\n", "from pathlib import Path\n", "import sys\n", "import time\n", "\n", "import google.cloud.aiplatform as aiplatform\n", "from kfp import dsl\n", "from kfp import compiler\n", "\n", "try:\n", " from google_cloud_pipeline_components.experimental.dataproc import DataprocSparkBatchOp\n", "except ModuleNotFoundError:\n", " from google_cloud_pipeline_components.v1.dataproc import DataprocSparkBatchOp\n", " \n", "import pandas as pd\n", "import pymysql\n", "import sqlalchemy\n", "\n", "module_path = os.path.abspath(os.pardir)\n", "if module_path not in sys.path:\n", " sys.path.append(module_path)\n", "\n", "from util.jdbc.jdbc_input_manager import JDBCInputManager\n", "from util.jdbc import jdbc_input_manager_interface\n", "from util import notebook_functions" ] }, { "attachments": {}, "cell_type": "markdown", "id": "09c4a209-db59-42f6-bba7-30cd46b16bad", "metadata": {}, "source": [ "## Step 3: Assign Parameters" ] }, { "attachments": {}, "cell_type": "markdown", "id": "92d3fbd8-013f-45e6-b7e9-8f31a4580e91", "metadata": { "tags": [] }, "source": [ "### Step 3.1 Common Parameters\n", " \n", "- PROJECT : GCP project-id\n", "- REGION : GCP region (us-central1)\n", "- GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts \n", "- SUBNET : VPC subnet\n", "- JARS : list of jars. For this notebook mysql connector and avro jar is required in addition with the dataproc template jars\n", "- MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 2\n", "- DATAPROC_SERVICE_ACCOUNT : Service account which will run serverless dataproc batch job" ] }, { "cell_type": "code", "execution_count": null, "id": "f4bfadaf-8122-446d-91be-9603a8efcc35", "metadata": { "tags": [ "parameters" ] }, "outputs": [], "source": [ "IS_PARAMETERIZED = False" ] }, { "cell_type": "code", "execution_count": null, "id": "bd8f6dd9-2e13-447c-b28d-10fa2321b759", "metadata": { "tags": [] }, "outputs": [], "source": [ "if not IS_PARAMETERIZED:\n", " PROJECT = \"\"\n", " REGION = \"\" # eg: us-central1 (any valid GCP region)\n", " GCS_STAGING_LOCATION = \"\" # eg: gs://my-staging-bucket/sub-folder\n", " SUBNET = \"projects/{project}/regions/{region}/subnetworks/{subnet}\"\n", " MAX_PARALLELISM = 5 # max number of tables which will migrated parallelly \n", " DATAPROC_SERVICE_ACCOUNT = \"\" # eg: test@project_id.iam.gserviceaccount.com\n", "\n", "# Do not change this parameter unless you want to refer below JARS from new location\n", "JARS = [GCS_STAGING_LOCATION + \"/jars/mysql-connector-java-8.0.29.jar\",\"file:///usr/lib/spark/external/spark-avro.jar\"]" ] }, { "attachments": {}, "cell_type": "markdown", "id": "051df2af-bd8b-47c7-8cb2-05404ca0d859", "metadata": {}, "source": [ "### Step 3.2 MySQL to Spanner Parameters\n", "- MYSQL_HOST: MySQL instance ip address\n", "- MYSQL_PORT: MySQL instance port\n", "- MYSQL_USERNAME: MySQL username\n", "- MYSQL_PASSWORD: MySQL password\n", "- MYSQL_DATABASE: Name of database that you want to migrate\n", "- MYSQL_TABLE_LIST: List of tables you want to migrate e.g.: ['table1','table2'] else provide an empty list for migration whole database e.g.: [] \n", "- MYSQL_READ_PARTITION_COLUMNS: Dictionary of custom read partition columns, e.g.: {'table2': 'secondary_id'}\n", "- MYSQL_OUTPUT_SPANNER_MODE: Output mode for MySQL data one of (overwrite|append). Use append if schema already exists in Spanner\n", "- SPANNER_INSTANCE: Cloud Spanner instance name\n", "- SPANNER_DATABASE: Cloud Spanner database name\n", "\n", "Spanner requires primary key for each table\n", "- SPANNER_TABLE_PRIMARY_KEYS: Dictionary of format {\"table_name\": \"primary_key_column1,primary_key_column2\"} for tables which do not have primary key in MySQL" ] }, { "cell_type": "code", "execution_count": null, "id": "71dd2824-e9a0-4ceb-a3c9-32f79973432a", "metadata": { "tags": [] }, "outputs": [], "source": [ "if not IS_PARAMETERIZED:\n", " MYSQL_HOST = \"\"\n", " MYSQL_PORT = \"3306\"\n", " MYSQL_USERNAME = \"\"\n", " MYSQL_PASSWORD = \"\"\n", " MYSQL_DATABASE = \"\"\n", " MYSQL_TABLE_LIST = [] # Leave list empty for migrating complete database else provide tables as ['table1','table2']\n", " MYSQL_READ_PARTITION_COLUMNS = {} # Leave empty for default read partition columns\n", " MYSQL_OUTPUT_SPANNER_MODE = \"overwrite\" # one of overwrite|append (Use append when schema already exists in Spanner)\n", "\n", " SPANNER_INSTANCE = \"\"\n", " SPANNER_DATABASE = \"\"\n", " SPANNER_TABLE_PRIMARY_KEYS = {} # Provide tables which do not have PK in MySQL {\"table_name\":\"primary_key_column1,primary_key_column2\"}" ] }, { "attachments": {}, "cell_type": "markdown", "id": "166b1536-d58e-423b-b3c2-cc0c171d275e", "metadata": {}, "source": [ "### Step 3.3 Notebook Configuration Parameters\n", "Below variables should not be changed unless required" ] }, { "cell_type": "code", "execution_count": null, "id": "367f66b6", "metadata": {}, "outputs": [], "source": [ "cur_path = Path(os.getcwd())\n", "\n", "if IS_PARAMETERIZED:\n", " WORKING_DIRECTORY = os.path.join(cur_path.parent ,'java')\n", "else:\n", " WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'java')\n", "\n", "# If the above code doesn't fetches the correct path please\n", "# provide complete path to python folder in your dataproc \n", "# template repo which you cloned \n", "\n", "# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/java/\"\n", "print(WORKING_DIRECTORY)" ] }, { "cell_type": "code", "execution_count": null, "id": "c6f0f037-e888-4479-a143-f06a39bd5cc1", "metadata": { "tags": [] }, "outputs": [], "source": [ "PYMYSQL_DRIVER = \"mysql+pymysql\"\n", "JDBC_DRIVER = \"com.mysql.cj.jdbc.Driver\"\n", "JDBC_URL = \"jdbc:mysql://{}:{}/{}?user={}&password={}\".format(MYSQL_HOST,MYSQL_PORT,MYSQL_DATABASE,MYSQL_USERNAME,MYSQL_PASSWORD)\n", "MAIN_CLASS = \"com.google.cloud.dataproc.templates.main.DataProcTemplate\"\n", "JAR_FILE = \"dataproc-templates-1.0-SNAPSHOT.jar\"\n", "LOG4J_PROPERTIES_PATH = \"./src/test/resources\"\n", "LOG4J_PROPERTIES = \"log4j-spark-driver-template.properties\"\n", "PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n", "\n", "# Adding Dataproc template JAR\n", "JARS.append(GCS_STAGING_LOCATION + \"/\" + JAR_FILE)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "115c062b-5a91-4372-b440-5c37a12fbf87", "metadata": {}, "source": [ "## Step 4: Generate MySQL Table List\n", "This step creates list of tables for migration. If MYSQL_TABLE_LIST is empty then all the tables in the MYSQL_DATABASE are listed for migration otherwise the provided list is used" ] }, { "cell_type": "code", "execution_count": null, "id": "d0e362ac-30cd-4857-9e2a-0e9eb926e627", "metadata": { "tags": [] }, "outputs": [], "source": [ "DB = sqlalchemy.create_engine(\n", " sqlalchemy.engine.url.URL.create(\n", " drivername=PYMYSQL_DRIVER,\n", " username=MYSQL_USERNAME,\n", " password=MYSQL_PASSWORD,\n", " database=MYSQL_DATABASE,\n", " host=MYSQL_HOST,\n", " port=MYSQL_PORT\n", " )\n", ")\n", "input_mgr = JDBCInputManager.create(\"mysql\", DB)\n", "\n", "# Retrieve list of tables from database.\n", "MYSQL_TABLE_LIST = input_mgr.build_table_list(schema_filter=MYSQL_DATABASE, table_filter=MYSQL_TABLE_LIST)\n", "print(f\"Total tables to migrate from schema {MYSQL_DATABASE}:\", len(MYSQL_TABLE_LIST))\n", " \n", "print(\"List of tables for migration:\")\n", "print(MYSQL_TABLE_LIST)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "1d9a62e8-7499-41c6-b32b-73b539b0c7c4", "metadata": {}, "source": [ "## Step 5: Get Primary Keys for Tables Not Present in SPANNER_TABLE_PRIMARY_KEYS\n", "For tables which do not have primary key provided in dictionary SPANNER_TABLE_PRIMARY_KEYS this step fetches primary key from MYSQL_DATABASE" ] }, { "cell_type": "code", "execution_count": null, "id": "6eda8fac-582c-4d4a-b871-311bb2863335", "metadata": { "tags": [] }, "outputs": [], "source": [ "for table_name, pk_columns in input_mgr.get_primary_keys().items():\n", " notebook_functions.update_spanner_primary_keys(SPANNER_TABLE_PRIMARY_KEYS, table_name, pk_columns)\n", "\n", "notebook_functions.remove_unexpected_spanner_primary_keys(SPANNER_TABLE_PRIMARY_KEYS, MYSQL_TABLE_LIST)" ] }, { "cell_type": "code", "execution_count": null, "id": "7c2a210f-48da-474f-bf46-89e755d01c67", "metadata": { "tags": [] }, "outputs": [], "source": [ "pkDF = pd.DataFrame({\"table\" : MYSQL_TABLE_LIST,\n", " \"primary_keys\": [SPANNER_TABLE_PRIMARY_KEYS.get(_) for _ in MYSQL_TABLE_LIST]})\n", "print(\"Below are identified primary keys for migrating MySQL table to Spanner:\")\n", "pkDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "02748c28-54e9-466c-9537-c00569122a96", "metadata": {}, "source": [ "## Step 6 Identify Read Partition Columns\n", "This step uses PARTITION_THRESHOLD (default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be used for partitioned read based on Primary Keys\n", " - PARTITION_OPTIONS: List will have table and its partitioned column and Spark SQL settings if exceeds the threshold" ] }, { "cell_type": "code", "execution_count": null, "id": "b4e4f6e9-c559-48e8-95fd-d2dd3e173439", "metadata": {}, "outputs": [], "source": [ "PARTITION_THRESHOLD = 200000 # Number of rows fetched per spark executor\n", "PARTITION_OPTIONS = input_mgr.define_read_partitioning(\n", " PARTITION_THRESHOLD, custom_partition_columns=MYSQL_READ_PARTITION_COLUMNS\n", ")\n", "input_mgr.read_partitioning_df(PARTITION_OPTIONS)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0d9bb170-09c4-40d1-baaf-9e907f215889", "metadata": {}, "source": [ "## Step 7: Calculate Parallel Jobs for MySQL to Cloud Spanner\n", "This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run" ] }, { "cell_type": "code", "execution_count": null, "id": "2c501db0-c1fb-4a05-88b8-a7e546e2b1d0", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Calculate parallel jobs:\n", "JOB_LIST = notebook_functions.split_list(input_mgr.get_table_list(), MAX_PARALLELISM)\n", "print(\"List of tables for execution:\")\n", "print(JOB_LIST)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "1fa5f841-a687-4723-a8e6-6e7e752ba36e", "metadata": {}, "source": [ "## Step 8: Create JAR files and Upload to Cloud Storage\n", "#### Run Step 8 one time for each new notebook instance" ] }, { "cell_type": "code", "execution_count": null, "id": "22220ae3-9fb4-471c-b5aa-f606deeca15e", "metadata": { "tags": [] }, "outputs": [], "source": [ "%cd $WORKING_DIRECTORY" ] }, { "attachments": {}, "cell_type": "markdown", "id": "bdee7afc-699b-4c1a-aeec-df0f99764ae0", "metadata": {}, "source": [ "#### Setting PATH variables for JDK and Maven and executing MAVEN build" ] }, { "cell_type": "code", "execution_count": null, "id": "ae36ebf4-5b9b-4341-b1c2-21c7e64f1051", "metadata": {}, "outputs": [], "source": [ "!wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.29.tar.gz\n", "!tar -xf mysql-connector-java-8.0.29.tar.gz\n", "!mvn clean spotless:apply install -DskipTests " ] }, { "attachments": {}, "cell_type": "markdown", "id": "9e1a779f-2c39-42ec-98be-0f5e9d715447", "metadata": {}, "source": [ "#### Copying JARS Files to GCS_STAGING_LOCATION" ] }, { "cell_type": "code", "execution_count": null, "id": "939cdcd5-0f3e-4f51-aa78-93d1976cb0f4", "metadata": { "tags": [] }, "outputs": [], "source": [ "!gsutil cp target/$JAR_FILE $GCS_STAGING_LOCATION/$JAR_FILE\n", "!gsutil cp $LOG4J_PROPERTIES_PATH/$LOG4J_PROPERTIES $GCS_STAGING_LOCATION/$LOG4J_PROPERTIES\n", "!gsutil cp mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar $GCS_STAGING_LOCATION/jars/mysql-connector-java-8.0.29.jar" ] }, { "attachments": {}, "cell_type": "markdown", "id": "78f6f83b-891a-4515-a1d6-f3406a25dc2a", "metadata": {}, "source": [ "## Step 9: Execute Pipeline to Migrate Tables from MySQL to Spanner" ] }, { "cell_type": "code", "execution_count": null, "id": "b51912cd-17cb-4607-a1e3-9a4a599cd611", "metadata": { "tags": [] }, "outputs": [], "source": [ "mysql_to_spanner_jobs = []" ] }, { "cell_type": "code", "execution_count": null, "id": "11e8a699-317d-46df-a4b1-7132e14ccdf4", "metadata": {}, "outputs": [], "source": [ "def migrate_mysql_to_spanner(EXECUTION_LIST):\n", " EXECUTION_LIST = EXECUTION_LIST\n", " aiplatform.init(project=PROJECT,staging_bucket=GCS_STAGING_LOCATION)\n", " \n", " @dsl.pipeline(\n", " name=\"java-mysql-to-spanner-spark\",\n", " description=\"Pipeline to get data from MySQL to Cloud Spanner\",\n", " )\n", " def pipeline(\n", " PROJECT_ID: str = PROJECT,\n", " LOCATION: str = REGION,\n", " MAIN_CLASS: str = MAIN_CLASS,\n", " JAR_FILE_URIS: list = JARS,\n", " SUBNETWORK_URI: str = SUBNET,\n", " FILE_URIS: list = [GCS_STAGING_LOCATION + \"/\" + LOG4J_PROPERTIES]\n", " ):\n", " for table in EXECUTION_LIST:\n", " BATCH_ID = \"mysql2spanner-{}-{}\".format(table,datetime.now().strftime(\"%s\")).replace('_','-').lower()\n", " mysql_to_spanner_jobs.append(BATCH_ID)\n", " if table in PARTITION_OPTIONS.keys():\n", " partition_options = PARTITION_OPTIONS[table]\n", " TEMPLATE_SPARK_ARGS = [\n", " \"--template=JDBCTOSPANNER\",\n", " \"--templateProperty\", \"project.id={}\".format(PROJECT),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.url={}\".format(JDBC_URL),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.driver.class.name={}\".format(JDBC_DRIVER),\n", " \"--templateProperty\", \"jdbctospanner.sql=select * from {}\".format(table),\n", " \"--templateProperty\", \"jdbctospanner.output.instance={}\".format(SPANNER_INSTANCE),\n", " \"--templateProperty\", \"jdbctospanner.output.database={}\".format(SPANNER_DATABASE),\n", " \"--templateProperty\", \"jdbctospanner.output.table={}\".format(table),\n", " \"--templateProperty\", \"jdbctospanner.output.saveMode={}\".format(MYSQL_OUTPUT_SPANNER_MODE.capitalize()),\n", " \"--templateProperty\", \"jdbctospanner.output.primaryKey={}\".format(SPANNER_TABLE_PRIMARY_KEYS[table]),\n", " \"--templateProperty\", \"jdbctospanner.output.batchInsertSize=200\",\n", " \"--templateProperty\", \"jdbctospanner.sql.partitionColumn={}\".format(partition_options[jdbc_input_manager_interface.SPARK_PARTITION_COLUMN]),\n", " \"--templateProperty\", \"jdbctospanner.sql.lowerBound={}\".format(partition_options[jdbc_input_manager_interface.SPARK_LOWER_BOUND]),\n", " \"--templateProperty\", \"jdbctospanner.sql.upperBound={}\".format(partition_options[jdbc_input_manager_interface.SPARK_UPPER_BOUND]),\n", " \"--templateProperty\", \"jdbctospanner.sql.numPartitions={}\".format(partition_options[jdbc_input_manager_interface.SPARK_NUM_PARTITIONS]),\n", " ]\n", " else:\n", " TEMPLATE_SPARK_ARGS = [\n", " \"--template=JDBCTOSPANNER\",\n", " \"--templateProperty\", \"project.id={}\".format(PROJECT),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.url={}\".format(JDBC_URL),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.driver.class.name={}\".format(JDBC_DRIVER),\n", " \"--templateProperty\", \"jdbctospanner.sql=select * from {}\".format(table),\n", " \"--templateProperty\", \"jdbctospanner.output.instance={}\".format(SPANNER_INSTANCE),\n", " \"--templateProperty\", \"jdbctospanner.output.database={}\".format(SPANNER_DATABASE),\n", " \"--templateProperty\", \"jdbctospanner.output.table={}\".format(table),\n", " \"--templateProperty\", \"jdbctospanner.output.saveMode={}\".format(MYSQL_OUTPUT_SPANNER_MODE.capitalize()),\n", " \"--templateProperty\", \"jdbctospanner.output.primaryKey={}\".format(SPANNER_TABLE_PRIMARY_KEYS[table]),\n", " \"--templateProperty\", \"jdbctospanner.output.batchInsertSize=200\",\n", " ]\n", " _ = DataprocSparkBatchOp(\n", " project=PROJECT_ID,\n", " location=LOCATION,\n", " batch_id=BATCH_ID,\n", " main_class=MAIN_CLASS,\n", " jar_file_uris=JAR_FILE_URIS,\n", " file_uris=FILE_URIS,\n", " subnetwork_uri=SUBNETWORK_URI,\n", " runtime_config_version=\"1.1\", # issue 665\n", " service_account=DATAPROC_SERVICE_ACCOUNT,\n", " args=TEMPLATE_SPARK_ARGS\n", " )\n", " time.sleep(1)\n", "\n", " compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n", "\n", " pipeline = aiplatform.PipelineJob(\n", " display_name=\"pipeline\",\n", " template_path=\"pipeline.json\",\n", " pipeline_root=PIPELINE_ROOT,\n", " enable_caching=False,\n", " location=REGION,\n", " )\n", " # run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using\n", " # specific service account instead of default service account \n", " # eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')\n", " pipeline.run()" ] }, { "cell_type": "code", "execution_count": null, "id": "e696ce34-b5a3-4f5d-98a7-ac881007c6c5", "metadata": { "tags": [] }, "outputs": [], "source": [ "for execution_list in JOB_LIST:\n", " print(execution_list)\n", " migrate_mysql_to_spanner(execution_list)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "9ce7f828-dacc-404b-8927-dc3813e7216a", "metadata": {}, "source": [ "## Step 10: Get status for tables migrated from MySql to Spanner" ] }, { "cell_type": "code", "execution_count": null, "id": "c282b2b9-b126-4cb6-a513-14a6322650c0", "metadata": { "tags": [] }, "outputs": [], "source": [ "def get_bearer_token():\n", " \n", " try:\n", " #Defining Scope\n", " CREDENTIAL_SCOPES = [\"https://www.googleapis.com/auth/cloud-platform\"]\n", "\n", " #Assining credentials and project value\n", " credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)\n", "\n", " #Refreshing credentials data\n", " credentials.refresh(requests.Request())\n", "\n", " #Get refreshed token\n", " token = credentials.token\n", " if token:\n", " return (token,200)\n", " else:\n", " return \"Bearer token not generated\"\n", " except Exception as error:\n", " return (\"Bearer token not generated. Error : {}\".format(error),500)" ] }, { "cell_type": "code", "execution_count": null, "id": "d1fcbc63-19db-42a8-a2ed-d9855da00c04", "metadata": { "tags": [] }, "outputs": [], "source": [ "from google.auth.transport import requests\n", "import google\n", "token = get_bearer_token()\n", "if token[1] == 200:\n", " print(\"Bearer token generated\")\n", "else:\n", " print(token)" ] }, { "cell_type": "code", "execution_count": null, "id": "5be3cf87-6d28-4b23-8466-87d3399f7a29", "metadata": { "tags": [] }, "outputs": [], "source": [ "import requests\n", "\n", "mysql_to_spanner_status = []\n", "job_status_url = \"https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}\"\n", "for job in mysql_to_spanner_jobs:\n", " auth = \"Bearer \" + token[0]\n", " url = job_status_url.format(PROJECT,REGION,job)\n", " headers = {\n", " 'Content-Type': 'application/json; charset=UTF-8',\n", " 'Authorization': auth \n", " }\n", " response = requests.get(url, headers=headers)\n", " mysql_to_spanner_status.append(response.json()['state'])" ] }, { "cell_type": "code", "execution_count": null, "id": "1097575d-07c2-4659-a75f-d7e898e3f077", "metadata": { "tags": [] }, "outputs": [], "source": [ "statusDF = pd.DataFrame({\"table\": MYSQL_TABLE_LIST, \"mysql_to_spanner_job\" : mysql_to_spanner_jobs, \"mysql_to_spanner_status\": mysql_to_spanner_status})\n", "statusDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0961f164-c7e4-4bb5-80f0-25fd1051147b", "metadata": {}, "source": [ "## Step 11: Validate Row Counts of Migrated Tables from MySQL to Cloud Spanner" ] }, { "cell_type": "code", "execution_count": null, "id": "25299344-c167-4764-a5d1-56c1b384d104", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Get MySQL table counts\n", "mysql_row_count = input_mgr.get_table_list_with_counts()" ] }, { "cell_type": "code", "execution_count": null, "id": "ab0e539d-5180-4f5b-915e-35f7ea45e0d3", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Get Cloud Spanner table counts\n", "spanner_row_count = []\n", "from google.cloud import spanner\n", "\n", "spanner_client = spanner.Client()\n", "instance = spanner_client.instance(SPANNER_INSTANCE)\n", "database = instance.database(SPANNER_DATABASE)\n", "\n", "for table in MYSQL_TABLE_LIST:\n", " with database.snapshot() as snapshot:\n", " qry = \"@{{USE_ADDITIONAL_PARALLELISM=true}} select count(1) from {}\".format(table)\n", " results = snapshot.execute_sql(qry)\n", " for row in results:\n", " spanner_row_count.append(row[0])" ] }, { "cell_type": "code", "execution_count": null, "id": "4b1afe12-3eb9-4133-8377-66dc63ac649c", "metadata": { "tags": [] }, "outputs": [], "source": [ "statusDF['mysql_row_count'] = mysql_row_count \n", "statusDF['spanner_row_count'] = spanner_row_count \n", "statusDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "9f30ffe4-efb5-4b84-b364-c40a296e95f7", "metadata": {}, "source": [ "## Post data loading activities\n", "- You may create relationships (FKs), constraints and indexes (as needed).\n", "- You may configure countinuous replication with [DataStream](https://cloud.google.com/datastream/docs/configure-your-source-mysql-database) or any other 3rd party tools." ] }, { "cell_type": "code", "execution_count": null, "id": "89ec62c1-0b95-4536-9339-03a4a8de035e", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "environment": { "kernel": "python3", "name": "common-cpu.m107", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/base-cpu:m107" }, "kernelspec": { "display_name": "Python 3 (ipykernel) (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.9" }, "vscode": { "interpreter": { "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" } } }, "nbformat": 4, "nbformat_minor": 5 }