notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb (903 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "98acd907",
"metadata": {},
"outputs": [],
"source": [
"# Copyright 2022 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1db5371a-8f16-47b7-bcc7-5af386e9b6d8",
"metadata": {},
"source": [
"# <center>MySQL to Cloud Spanner Migration (or Bulk Load)\n",
"<table align=\"left\">\n",
"<td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb\">\n",
" <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fmysql2spanner%2FMySqlToSpanner_notebook.ipynb\">\n",
" <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb\">\n",
" <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb\">\n",
" <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n",
" </a>\n",
"</td>\n",
"</table>"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "dd944742",
"metadata": {},
"source": [
"#### References\n",
"\n",
"- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)\n",
"- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n",
"- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n",
"- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n",
"\n",
"This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. \n",
"Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. \n",
"\n",
"#### Permissions\n",
"\n",
"Make sure that the service account used to run the notebook has the following roles:\n",
"\n",
"- roles/aiplatform.serviceAgent\n",
"- roles/aiplatform.customCodeServiceAgent\n",
"- roles/storage.objectCreator\n",
"- roles/storage.objectViewer\n",
"- roles/dataproc.editor\n",
"- roles/dataproc.worker"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7d89b301-5249-462a-97d8-986488b303fd",
"metadata": {},
"source": [
"## Step 1: Install Libraries\n",
"#### Run Step 1 one time for each new notebook instance"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fef65ec2-ad6b-407f-a993-7cdf871bba11",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!pip3 install pymysql SQLAlchemy\n",
"!pip3 install --upgrade google-cloud-pipeline-components kfp --user -q\n",
"!pip3 install google-cloud-spanner\n",
"!pip3 install --upgrade google-cloud-storage"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0e90943f-b965-4f7f-b631-ce62227d5e83",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!sudo apt-get update -y\n",
"!sudo apt-get install default-jdk -y\n",
"!sudo apt-get install maven -y"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "35712473-92ef-433b-9ce6-b5649357c09e",
"metadata": {},
"source": [
"#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.\n",
"\n",
"Uncomment & Run this cell if you have installed anything from above commands"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "19ef1307-902e-4713-8948-b86084e19312",
"metadata": {},
"outputs": [],
"source": [
"# import os\n",
"# import IPython\n",
"# if not os.getenv(\"IS_TESTING\"):\n",
"# app = IPython.Application.instance()\n",
"# app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"id": "b5f21db0",
"metadata": {},
"source": [
"Uncomment & Run this cell if you are running from Colab"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ba566c42",
"metadata": {},
"outputs": [],
"source": [
"# !git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git\n",
"# !mv /content/dataproc-templates/notebooks/util /content/\n",
"# !mv /content/dataproc-templates/java/ /content/"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "70d01e33-9099-4d2e-b57e-575c3a998d84",
"metadata": {},
"source": [
"## Step 2: Import Libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2703b502-1b41-44f1-bf21-41069255bc32",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from datetime import datetime\n",
"import os\n",
"from pathlib import Path\n",
"import sys\n",
"import time\n",
"\n",
"import google.cloud.aiplatform as aiplatform\n",
"from kfp import dsl\n",
"from kfp import compiler\n",
"\n",
"try:\n",
" from google_cloud_pipeline_components.experimental.dataproc import DataprocSparkBatchOp\n",
"except ModuleNotFoundError:\n",
" from google_cloud_pipeline_components.v1.dataproc import DataprocSparkBatchOp\n",
" \n",
"import pandas as pd\n",
"import pymysql\n",
"import sqlalchemy\n",
"\n",
"module_path = os.path.abspath(os.pardir)\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)\n",
"\n",
"from util.jdbc.jdbc_input_manager import JDBCInputManager\n",
"from util.jdbc import jdbc_input_manager_interface\n",
"from util import notebook_functions"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "09c4a209-db59-42f6-bba7-30cd46b16bad",
"metadata": {},
"source": [
"## Step 3: Assign Parameters"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "92d3fbd8-013f-45e6-b7e9-8f31a4580e91",
"metadata": {
"tags": []
},
"source": [
"### Step 3.1 Common Parameters\n",
" \n",
"- PROJECT : GCP project-id\n",
"- REGION : GCP region (us-central1)\n",
"- GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts \n",
"- SUBNET : VPC subnet\n",
"- JARS : list of jars. For this notebook mysql connector and avro jar is required in addition with the dataproc template jars\n",
"- MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 2\n",
"- DATAPROC_SERVICE_ACCOUNT : Service account which will run serverless dataproc batch job"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f4bfadaf-8122-446d-91be-9603a8efcc35",
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"IS_PARAMETERIZED = False"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bd8f6dd9-2e13-447c-b28d-10fa2321b759",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"if not IS_PARAMETERIZED:\n",
" PROJECT = \"\"\n",
" REGION = \"\" # eg: us-central1 (any valid GCP region)\n",
" GCS_STAGING_LOCATION = \"\" # eg: gs://my-staging-bucket/sub-folder\n",
" SUBNET = \"projects/{project}/regions/{region}/subnetworks/{subnet}\"\n",
" MAX_PARALLELISM = 5 # max number of tables which will migrated parallelly \n",
" DATAPROC_SERVICE_ACCOUNT = \"\" # eg: test@project_id.iam.gserviceaccount.com\n",
"\n",
"# Do not change this parameter unless you want to refer below JARS from new location\n",
"JARS = [GCS_STAGING_LOCATION + \"/jars/mysql-connector-java-8.0.29.jar\",\"file:///usr/lib/spark/external/spark-avro.jar\"]"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "051df2af-bd8b-47c7-8cb2-05404ca0d859",
"metadata": {},
"source": [
"### Step 3.2 MySQL to Spanner Parameters\n",
"- MYSQL_HOST: MySQL instance ip address\n",
"- MYSQL_PORT: MySQL instance port\n",
"- MYSQL_USERNAME: MySQL username\n",
"- MYSQL_PASSWORD: MySQL password\n",
"- MYSQL_DATABASE: Name of database that you want to migrate\n",
"- MYSQL_TABLE_LIST: List of tables you want to migrate e.g.: ['table1','table2'] else provide an empty list for migration whole database e.g.: [] \n",
"- MYSQL_READ_PARTITION_COLUMNS: Dictionary of custom read partition columns, e.g.: {'table2': 'secondary_id'}\n",
"- MYSQL_OUTPUT_SPANNER_MODE: Output mode for MySQL data one of (overwrite|append). Use append if schema already exists in Spanner\n",
"- SPANNER_INSTANCE: Cloud Spanner instance name\n",
"- SPANNER_DATABASE: Cloud Spanner database name\n",
"\n",
"Spanner requires primary key for each table\n",
"- SPANNER_TABLE_PRIMARY_KEYS: Dictionary of format {\"table_name\": \"primary_key_column1,primary_key_column2\"} for tables which do not have primary key in MySQL"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "71dd2824-e9a0-4ceb-a3c9-32f79973432a",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"if not IS_PARAMETERIZED:\n",
" MYSQL_HOST = \"\"\n",
" MYSQL_PORT = \"3306\"\n",
" MYSQL_USERNAME = \"\"\n",
" MYSQL_PASSWORD = \"\"\n",
" MYSQL_DATABASE = \"\"\n",
" MYSQL_TABLE_LIST = [] # Leave list empty for migrating complete database else provide tables as ['table1','table2']\n",
" MYSQL_READ_PARTITION_COLUMNS = {} # Leave empty for default read partition columns\n",
" MYSQL_OUTPUT_SPANNER_MODE = \"overwrite\" # one of overwrite|append (Use append when schema already exists in Spanner)\n",
"\n",
" SPANNER_INSTANCE = \"\"\n",
" SPANNER_DATABASE = \"\"\n",
" SPANNER_TABLE_PRIMARY_KEYS = {} # Provide tables which do not have PK in MySQL {\"table_name\":\"primary_key_column1,primary_key_column2\"}"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "166b1536-d58e-423b-b3c2-cc0c171d275e",
"metadata": {},
"source": [
"### Step 3.3 Notebook Configuration Parameters\n",
"Below variables should not be changed unless required"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "367f66b6",
"metadata": {},
"outputs": [],
"source": [
"cur_path = Path(os.getcwd())\n",
"\n",
"if IS_PARAMETERIZED:\n",
" WORKING_DIRECTORY = os.path.join(cur_path.parent ,'java')\n",
"else:\n",
" WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'java')\n",
"\n",
"# If the above code doesn't fetches the correct path please\n",
"# provide complete path to python folder in your dataproc \n",
"# template repo which you cloned \n",
"\n",
"# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/java/\"\n",
"print(WORKING_DIRECTORY)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c6f0f037-e888-4479-a143-f06a39bd5cc1",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"PYMYSQL_DRIVER = \"mysql+pymysql\"\n",
"JDBC_DRIVER = \"com.mysql.cj.jdbc.Driver\"\n",
"JDBC_URL = \"jdbc:mysql://{}:{}/{}?user={}&password={}\".format(MYSQL_HOST,MYSQL_PORT,MYSQL_DATABASE,MYSQL_USERNAME,MYSQL_PASSWORD)\n",
"MAIN_CLASS = \"com.google.cloud.dataproc.templates.main.DataProcTemplate\"\n",
"JAR_FILE = \"dataproc-templates-1.0-SNAPSHOT.jar\"\n",
"LOG4J_PROPERTIES_PATH = \"./src/test/resources\"\n",
"LOG4J_PROPERTIES = \"log4j-spark-driver-template.properties\"\n",
"PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n",
"\n",
"# Adding Dataproc template JAR\n",
"JARS.append(GCS_STAGING_LOCATION + \"/\" + JAR_FILE)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "115c062b-5a91-4372-b440-5c37a12fbf87",
"metadata": {},
"source": [
"## Step 4: Generate MySQL Table List\n",
"This step creates list of tables for migration. If MYSQL_TABLE_LIST is empty then all the tables in the MYSQL_DATABASE are listed for migration otherwise the provided list is used"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d0e362ac-30cd-4857-9e2a-0e9eb926e627",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"DB = sqlalchemy.create_engine(\n",
" sqlalchemy.engine.url.URL.create(\n",
" drivername=PYMYSQL_DRIVER,\n",
" username=MYSQL_USERNAME,\n",
" password=MYSQL_PASSWORD,\n",
" database=MYSQL_DATABASE,\n",
" host=MYSQL_HOST,\n",
" port=MYSQL_PORT\n",
" )\n",
")\n",
"input_mgr = JDBCInputManager.create(\"mysql\", DB)\n",
"\n",
"# Retrieve list of tables from database.\n",
"MYSQL_TABLE_LIST = input_mgr.build_table_list(schema_filter=MYSQL_DATABASE, table_filter=MYSQL_TABLE_LIST)\n",
"print(f\"Total tables to migrate from schema {MYSQL_DATABASE}:\", len(MYSQL_TABLE_LIST))\n",
" \n",
"print(\"List of tables for migration:\")\n",
"print(MYSQL_TABLE_LIST)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1d9a62e8-7499-41c6-b32b-73b539b0c7c4",
"metadata": {},
"source": [
"## Step 5: Get Primary Keys for Tables Not Present in SPANNER_TABLE_PRIMARY_KEYS\n",
"For tables which do not have primary key provided in dictionary SPANNER_TABLE_PRIMARY_KEYS this step fetches primary key from MYSQL_DATABASE"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6eda8fac-582c-4d4a-b871-311bb2863335",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"for table_name, pk_columns in input_mgr.get_primary_keys().items():\n",
" notebook_functions.update_spanner_primary_keys(SPANNER_TABLE_PRIMARY_KEYS, table_name, pk_columns)\n",
"\n",
"notebook_functions.remove_unexpected_spanner_primary_keys(SPANNER_TABLE_PRIMARY_KEYS, MYSQL_TABLE_LIST)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7c2a210f-48da-474f-bf46-89e755d01c67",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"pkDF = pd.DataFrame({\"table\" : MYSQL_TABLE_LIST,\n",
" \"primary_keys\": [SPANNER_TABLE_PRIMARY_KEYS.get(_) for _ in MYSQL_TABLE_LIST]})\n",
"print(\"Below are identified primary keys for migrating MySQL table to Spanner:\")\n",
"pkDF"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "02748c28-54e9-466c-9537-c00569122a96",
"metadata": {},
"source": [
"## Step 6 Identify Read Partition Columns\n",
"This step uses PARTITION_THRESHOLD (default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be used for partitioned read based on Primary Keys\n",
" - PARTITION_OPTIONS: List will have table and its partitioned column and Spark SQL settings if exceeds the threshold"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b4e4f6e9-c559-48e8-95fd-d2dd3e173439",
"metadata": {},
"outputs": [],
"source": [
"PARTITION_THRESHOLD = 200000 # Number of rows fetched per spark executor\n",
"PARTITION_OPTIONS = input_mgr.define_read_partitioning(\n",
" PARTITION_THRESHOLD, custom_partition_columns=MYSQL_READ_PARTITION_COLUMNS\n",
")\n",
"input_mgr.read_partitioning_df(PARTITION_OPTIONS)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "0d9bb170-09c4-40d1-baaf-9e907f215889",
"metadata": {},
"source": [
"## Step 7: Calculate Parallel Jobs for MySQL to Cloud Spanner\n",
"This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2c501db0-c1fb-4a05-88b8-a7e546e2b1d0",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Calculate parallel jobs:\n",
"JOB_LIST = notebook_functions.split_list(input_mgr.get_table_list(), MAX_PARALLELISM)\n",
"print(\"List of tables for execution:\")\n",
"print(JOB_LIST)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1fa5f841-a687-4723-a8e6-6e7e752ba36e",
"metadata": {},
"source": [
"## Step 8: Create JAR files and Upload to Cloud Storage\n",
"#### Run Step 8 one time for each new notebook instance"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "22220ae3-9fb4-471c-b5aa-f606deeca15e",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%cd $WORKING_DIRECTORY"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "bdee7afc-699b-4c1a-aeec-df0f99764ae0",
"metadata": {},
"source": [
"#### Setting PATH variables for JDK and Maven and executing MAVEN build"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ae36ebf4-5b9b-4341-b1c2-21c7e64f1051",
"metadata": {},
"outputs": [],
"source": [
"!wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.29.tar.gz\n",
"!tar -xf mysql-connector-java-8.0.29.tar.gz\n",
"!mvn clean spotless:apply install -DskipTests "
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9e1a779f-2c39-42ec-98be-0f5e9d715447",
"metadata": {},
"source": [
"#### Copying JARS Files to GCS_STAGING_LOCATION"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "939cdcd5-0f3e-4f51-aa78-93d1976cb0f4",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!gsutil cp target/$JAR_FILE $GCS_STAGING_LOCATION/$JAR_FILE\n",
"!gsutil cp $LOG4J_PROPERTIES_PATH/$LOG4J_PROPERTIES $GCS_STAGING_LOCATION/$LOG4J_PROPERTIES\n",
"!gsutil cp mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar $GCS_STAGING_LOCATION/jars/mysql-connector-java-8.0.29.jar"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "78f6f83b-891a-4515-a1d6-f3406a25dc2a",
"metadata": {},
"source": [
"## Step 9: Execute Pipeline to Migrate Tables from MySQL to Spanner"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b51912cd-17cb-4607-a1e3-9a4a599cd611",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"mysql_to_spanner_jobs = []"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "11e8a699-317d-46df-a4b1-7132e14ccdf4",
"metadata": {},
"outputs": [],
"source": [
"def migrate_mysql_to_spanner(EXECUTION_LIST):\n",
" EXECUTION_LIST = EXECUTION_LIST\n",
" aiplatform.init(project=PROJECT,staging_bucket=GCS_STAGING_LOCATION)\n",
" \n",
" @dsl.pipeline(\n",
" name=\"java-mysql-to-spanner-spark\",\n",
" description=\"Pipeline to get data from MySQL to Cloud Spanner\",\n",
" )\n",
" def pipeline(\n",
" PROJECT_ID: str = PROJECT,\n",
" LOCATION: str = REGION,\n",
" MAIN_CLASS: str = MAIN_CLASS,\n",
" JAR_FILE_URIS: list = JARS,\n",
" SUBNETWORK_URI: str = SUBNET,\n",
" FILE_URIS: list = [GCS_STAGING_LOCATION + \"/\" + LOG4J_PROPERTIES]\n",
" ):\n",
" for table in EXECUTION_LIST:\n",
" BATCH_ID = \"mysql2spanner-{}-{}\".format(table,datetime.now().strftime(\"%s\")).replace('_','-').lower()\n",
" mysql_to_spanner_jobs.append(BATCH_ID)\n",
" if table in PARTITION_OPTIONS.keys():\n",
" partition_options = PARTITION_OPTIONS[table]\n",
" TEMPLATE_SPARK_ARGS = [\n",
" \"--template=JDBCTOSPANNER\",\n",
" \"--templateProperty\", \"project.id={}\".format(PROJECT),\n",
" \"--templateProperty\", \"jdbctospanner.jdbc.url={}\".format(JDBC_URL),\n",
" \"--templateProperty\", \"jdbctospanner.jdbc.driver.class.name={}\".format(JDBC_DRIVER),\n",
" \"--templateProperty\", \"jdbctospanner.sql=select * from {}\".format(table),\n",
" \"--templateProperty\", \"jdbctospanner.output.instance={}\".format(SPANNER_INSTANCE),\n",
" \"--templateProperty\", \"jdbctospanner.output.database={}\".format(SPANNER_DATABASE),\n",
" \"--templateProperty\", \"jdbctospanner.output.table={}\".format(table),\n",
" \"--templateProperty\", \"jdbctospanner.output.saveMode={}\".format(MYSQL_OUTPUT_SPANNER_MODE.capitalize()),\n",
" \"--templateProperty\", \"jdbctospanner.output.primaryKey={}\".format(SPANNER_TABLE_PRIMARY_KEYS[table]),\n",
" \"--templateProperty\", \"jdbctospanner.output.batchInsertSize=200\",\n",
" \"--templateProperty\", \"jdbctospanner.sql.partitionColumn={}\".format(partition_options[jdbc_input_manager_interface.SPARK_PARTITION_COLUMN]),\n",
" \"--templateProperty\", \"jdbctospanner.sql.lowerBound={}\".format(partition_options[jdbc_input_manager_interface.SPARK_LOWER_BOUND]),\n",
" \"--templateProperty\", \"jdbctospanner.sql.upperBound={}\".format(partition_options[jdbc_input_manager_interface.SPARK_UPPER_BOUND]),\n",
" \"--templateProperty\", \"jdbctospanner.sql.numPartitions={}\".format(partition_options[jdbc_input_manager_interface.SPARK_NUM_PARTITIONS]),\n",
" ]\n",
" else:\n",
" TEMPLATE_SPARK_ARGS = [\n",
" \"--template=JDBCTOSPANNER\",\n",
" \"--templateProperty\", \"project.id={}\".format(PROJECT),\n",
" \"--templateProperty\", \"jdbctospanner.jdbc.url={}\".format(JDBC_URL),\n",
" \"--templateProperty\", \"jdbctospanner.jdbc.driver.class.name={}\".format(JDBC_DRIVER),\n",
" \"--templateProperty\", \"jdbctospanner.sql=select * from {}\".format(table),\n",
" \"--templateProperty\", \"jdbctospanner.output.instance={}\".format(SPANNER_INSTANCE),\n",
" \"--templateProperty\", \"jdbctospanner.output.database={}\".format(SPANNER_DATABASE),\n",
" \"--templateProperty\", \"jdbctospanner.output.table={}\".format(table),\n",
" \"--templateProperty\", \"jdbctospanner.output.saveMode={}\".format(MYSQL_OUTPUT_SPANNER_MODE.capitalize()),\n",
" \"--templateProperty\", \"jdbctospanner.output.primaryKey={}\".format(SPANNER_TABLE_PRIMARY_KEYS[table]),\n",
" \"--templateProperty\", \"jdbctospanner.output.batchInsertSize=200\",\n",
" ]\n",
" _ = DataprocSparkBatchOp(\n",
" project=PROJECT_ID,\n",
" location=LOCATION,\n",
" batch_id=BATCH_ID,\n",
" main_class=MAIN_CLASS,\n",
" jar_file_uris=JAR_FILE_URIS,\n",
" file_uris=FILE_URIS,\n",
" subnetwork_uri=SUBNETWORK_URI,\n",
" runtime_config_version=\"1.1\", # issue 665\n",
" service_account=DATAPROC_SERVICE_ACCOUNT,\n",
" args=TEMPLATE_SPARK_ARGS\n",
" )\n",
" time.sleep(1)\n",
"\n",
" compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n",
"\n",
" pipeline = aiplatform.PipelineJob(\n",
" display_name=\"pipeline\",\n",
" template_path=\"pipeline.json\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
" enable_caching=False,\n",
" location=REGION,\n",
" )\n",
" # run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using\n",
" # specific service account instead of default service account \n",
" # eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')\n",
" pipeline.run()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e696ce34-b5a3-4f5d-98a7-ac881007c6c5",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"for execution_list in JOB_LIST:\n",
" print(execution_list)\n",
" migrate_mysql_to_spanner(execution_list)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9ce7f828-dacc-404b-8927-dc3813e7216a",
"metadata": {},
"source": [
"## Step 10: Get status for tables migrated from MySql to Spanner"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c282b2b9-b126-4cb6-a513-14a6322650c0",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def get_bearer_token():\n",
" \n",
" try:\n",
" #Defining Scope\n",
" CREDENTIAL_SCOPES = [\"https://www.googleapis.com/auth/cloud-platform\"]\n",
"\n",
" #Assining credentials and project value\n",
" credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)\n",
"\n",
" #Refreshing credentials data\n",
" credentials.refresh(requests.Request())\n",
"\n",
" #Get refreshed token\n",
" token = credentials.token\n",
" if token:\n",
" return (token,200)\n",
" else:\n",
" return \"Bearer token not generated\"\n",
" except Exception as error:\n",
" return (\"Bearer token not generated. Error : {}\".format(error),500)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d1fcbc63-19db-42a8-a2ed-d9855da00c04",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from google.auth.transport import requests\n",
"import google\n",
"token = get_bearer_token()\n",
"if token[1] == 200:\n",
" print(\"Bearer token generated\")\n",
"else:\n",
" print(token)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5be3cf87-6d28-4b23-8466-87d3399f7a29",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import requests\n",
"\n",
"mysql_to_spanner_status = []\n",
"job_status_url = \"https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}\"\n",
"for job in mysql_to_spanner_jobs:\n",
" auth = \"Bearer \" + token[0]\n",
" url = job_status_url.format(PROJECT,REGION,job)\n",
" headers = {\n",
" 'Content-Type': 'application/json; charset=UTF-8',\n",
" 'Authorization': auth \n",
" }\n",
" response = requests.get(url, headers=headers)\n",
" mysql_to_spanner_status.append(response.json()['state'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1097575d-07c2-4659-a75f-d7e898e3f077",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"statusDF = pd.DataFrame({\"table\": MYSQL_TABLE_LIST, \"mysql_to_spanner_job\" : mysql_to_spanner_jobs, \"mysql_to_spanner_status\": mysql_to_spanner_status})\n",
"statusDF"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "0961f164-c7e4-4bb5-80f0-25fd1051147b",
"metadata": {},
"source": [
"## Step 11: Validate Row Counts of Migrated Tables from MySQL to Cloud Spanner"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "25299344-c167-4764-a5d1-56c1b384d104",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Get MySQL table counts\n",
"mysql_row_count = input_mgr.get_table_list_with_counts()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ab0e539d-5180-4f5b-915e-35f7ea45e0d3",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Get Cloud Spanner table counts\n",
"spanner_row_count = []\n",
"from google.cloud import spanner\n",
"\n",
"spanner_client = spanner.Client()\n",
"instance = spanner_client.instance(SPANNER_INSTANCE)\n",
"database = instance.database(SPANNER_DATABASE)\n",
"\n",
"for table in MYSQL_TABLE_LIST:\n",
" with database.snapshot() as snapshot:\n",
" qry = \"@{{USE_ADDITIONAL_PARALLELISM=true}} select count(1) from {}\".format(table)\n",
" results = snapshot.execute_sql(qry)\n",
" for row in results:\n",
" spanner_row_count.append(row[0])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4b1afe12-3eb9-4133-8377-66dc63ac649c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"statusDF['mysql_row_count'] = mysql_row_count \n",
"statusDF['spanner_row_count'] = spanner_row_count \n",
"statusDF"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9f30ffe4-efb5-4b84-b364-c40a296e95f7",
"metadata": {},
"source": [
"## Post data loading activities\n",
"- You may create relationships (FKs), constraints and indexes (as needed).\n",
"- You may configure countinuous replication with [DataStream](https://cloud.google.com/datastream/docs/configure-your-source-mysql-database) or any other 3rd party tools."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "89ec62c1-0b95-4536-9339-03a4a8de035e",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "python3",
"name": "common-cpu.m107",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m107"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel) (Local)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.9"
},
"vscode": {
"interpreter": {
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}