notebooks/oracle2spanner/OracleToSpanner_notebook.ipynb (894 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "98acd907", "metadata": {}, "outputs": [], "source": [ "# Copyright 2022 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "id": "1db5371a-8f16-47b7-bcc7-5af386e9b6d8", "metadata": {}, "source": [ "# <center>Oracle to Cloud Spanner\n", "<table align=\"left\">\n", "<td>\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/oracle2spanner/OracleToSpanner_notebook.ipynb\">\n", " <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Foracle2spanner%2FOracleToSpanner_notebook.ipynb\">\n", " <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/oracle2spanner/OracleToSpanner_notebook.ipynb\">\n", " <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n", " </a>\n", "</td>\n", "<td>\n", " <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/oracle2spanner/OracleToSpanner_notebook.ipynb\">\n", " <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n", " </a>\n", "</td>\n", "</table>" ] }, { "cell_type": "markdown", "id": "dd944742", "metadata": {}, "source": [ "#### References\n", "\n", "- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)\n", "- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n", "- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n", "- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n", "\n", "#### Overview - Oracle to Cloud Spanner Migration\n", "This notebook helps with the step by step process of migrating Oracle database tables to Cloud Spanner using Dataproc template. This notebook solution uses the [JDBCTOSPANNER](../../java/src/main/java/com/google/cloud/dataproc/templates/jdbc/README.md) Spark (Java) template. The migration will create the Spanner tables if not exist, and it will overwrite or append based on the write mode configured in the notebook.\n", "##### Feedback\n", "Share you feedback, ideas, thoughts [feedback-form](https://forms.gle/XXCJeWeCJJ9fNLQS6). \n", "Questions, issues, and comments should be directed to dataproc-templates-support-external@googlegroups.com\n", "\n", "#### Permissions\n", "This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. \n", "Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. If using custom service account, service account attached to Vertex AI notebook should have Service Account User role to use custom role in job.\n", "\n", "Make sure that the service account used to run the notebook has the following roles:\n", "\n", "- roles/aiplatform.serviceAgent\n", "- roles/aiplatform.customCodeServiceAgent\n", "- roles/storage.objectCreator\n", "- roles/storage.objectViewer\n", "- roles/dataproc.editor\n", "- roles/dataproc.worker\n", "- roles/bigquery.dataEditor" ] }, { "cell_type": "markdown", "id": "7d89b301-5249-462a-97d8-986488b303fd", "metadata": {}, "source": [ "## Step 1: Install Libraries\n", "#### Run Step 1 one time for each new notebook instance" ] }, { "cell_type": "code", "execution_count": null, "id": "fef65ec2-ad6b-407f-a993-7cdf871bba11", "metadata": { "tags": [] }, "outputs": [], "source": [ "! pip3 install SQLAlchemy\n", "! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q\n", "! pip3 install cx-Oracle\n", "! pip3 install google-cloud-spanner" ] }, { "cell_type": "code", "execution_count": null, "id": "0ed8fb64-8bae-4ff4-9e87-fdcc8ee04bcf", "metadata": {}, "outputs": [], "source": [ "!sudo apt-get update -y\n", "!sudo apt-get install default-jdk -y\n", "!sudo apt-get install maven -y" ] }, { "cell_type": "markdown", "id": "f023c6c2", "metadata": {}, "source": [ "#### Oracle client Installation" ] }, { "cell_type": "code", "execution_count": null, "id": "0e90943f-b965-4f7f-b631-ce62227d5e83", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%bash\n", "sudo mkdir -p /opt/oracle\n", "sudo rm -fr /opt/oracle/instantclient*\n", "cd /opt/oracle\n", "sudo wget --no-verbose https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip\n", "sudo unzip instantclient-basic-linuxx64.zip\n", "INSTANT_CLIENT_DIR=$(find /opt/oracle -maxdepth 1 -type d -name \"instantclient_[0-9]*_[0-9]*\" | sort | tail -1)\n", "test -n \"${INSTANT_CLIENT_DIR}\" || echo \"ERROR: Could not find instant client\"\n", "test -n \"${INSTANT_CLIENT_DIR}\" || exit 1\n", "sudo apt-get install libaio1\n", "sudo sh -c \"echo ${INSTANT_CLIENT_DIR} > /etc/ld.so.conf.d/oracle-instantclient.conf\"\n", "sudo ldconfig\n", "export LD_LIBRARY_PATH=${INSTANT_CLIENT_DIR}:$LD_LIBRARY_PATH" ] }, { "attachments": {}, "cell_type": "markdown", "id": "35712473-92ef-433b-9ce6-b5649357c09e", "metadata": {}, "source": [ "#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.\n", "\n", "Uncomment & run this cell if you have installed anything from above commands" ] }, { "cell_type": "code", "execution_count": null, "id": "19ef1307-902e-4713-8948-b86084e19312", "metadata": {}, "outputs": [], "source": [ "# import os\n", "# import IPython\n", "# if not os.getenv(\"IS_TESTING\"):\n", "# app = IPython.Application.instance()\n", "# app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "id": "70d01e33-9099-4d2e-b57e-575c3a998d84", "metadata": {}, "source": [ "## Step 2: Import Libraries" ] }, { "cell_type": "code", "execution_count": null, "id": "2703b502-1b41-44f1-bf21-41069255bc32", "metadata": { "tags": [] }, "outputs": [], "source": [ "from datetime import datetime\n", "import os\n", "from pathlib import Path\n", "import sys\n", "import time\n", "\n", "import google.cloud.aiplatform as aiplatform\n", "from kfp import dsl\n", "from kfp import compiler\n", "\n", "try:\n", " from google_cloud_pipeline_components.experimental.dataproc import DataprocSparkBatchOp\n", "except ModuleNotFoundError:\n", " from google_cloud_pipeline_components.v1.dataproc import DataprocSparkBatchOp\n", " \n", "import pandas as pd\n", "import sqlalchemy\n", "\n", "module_path = os.path.abspath(os.pardir)\n", "if module_path not in sys.path:\n", " sys.path.append(module_path)\n", "\n", "from util.jdbc.jdbc_input_manager import JDBCInputManager\n", "from util.jdbc import jdbc_input_manager_interface\n", "from util import notebook_functions" ] }, { "cell_type": "markdown", "id": "09c4a209-db59-42f6-bba7-30cd46b16bad", "metadata": {}, "source": [ "## Step 3: Assign Parameters" ] }, { "attachments": {}, "cell_type": "markdown", "id": "92d3fbd8-013f-45e6-b7e9-8f31a4580e91", "metadata": {}, "source": [ "### Step 3.1 Common Parameters\n", " \n", "- PROJECT : GCP project-id\n", "- REGION : GCP region\n", "- GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts\n", "- SUBNET : VPC subnet\n", "- JARS : list of jars. For this notebook oracle connector is required in addition with the dataproc template jars\n", "- MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 5\n", "- SERVICE_ACCOUNT : Custom service account email to use for vertex ai pipeline and dataproc job with above mentioned permissions" ] }, { "cell_type": "code", "execution_count": null, "id": "bd8f6dd9-2e13-447c-b28d-10fa2321b759", "metadata": { "tags": [] }, "outputs": [], "source": [ "PROJECT = \"<project-id>\"\n", "REGION = \"<region>\"\n", "GCS_STAGING_LOCATION = \"<gs://bucket/[folder]>\"\n", "SUBNET = \"<projects/{project}/regions/{region}/subnetworks/{subnet}>\"\n", "MAX_PARALLELISM = 5\n", "SERVICE_ACCOUNT = \"<Service-Account>\"\n", "\n", "OJDBC_JAR = \"ojdbc8-21.7.0.0.jar\" # For Oracle 11g use ojdbc6-11.2.0.4.jar\n", "# Do not change this parameter unless you want to refer below JARS from new location\n", "JARS = [f\"{GCS_STAGING_LOCATION}/jars/{OJDBC_JAR}\"]" ] }, { "cell_type": "code", "execution_count": null, "id": "128c08ea-67c2-4afb-b04f-33ec6351a7c7", "metadata": {}, "outputs": [], "source": [ "# If SERVICE_ACCOUNT is not specified it will take the one attached to Notebook\n", "if SERVICE_ACCOUNT == \"\":\n", " shell_output = !gcloud auth list 2>/dev/null\n", " SERVICE_ACCOUNT = shell_output[2].replace(\"*\", \"\").strip()\n", " print(\"Service Account: \",SERVICE_ACCOUNT)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "051df2af-bd8b-47c7-8cb2-05404ca0d859", "metadata": {}, "source": [ "### Step 3.2 Oracle to Cloud Spanner Parameters\n", "- ORACLE_HOST: Oracle instance ip address\n", "- ORACLE_PORT: Oracle instance port\n", "- ORACLE_USERNAME: Oracle username\n", "- ORACLE_PASSWORD: Oracle password\n", "- ORACLE_DATABASE: Name of database/service for Oracle connection\n", "- ORACLE_SCHEMA: Schema to be exported, leave blank to export tables owned by ORACLE_USERNAME\n", "- ORACLE_TABLE_LIST: List of tables to migrate eg: ['table1', 'table2'] else provide an empty list for migration whole database eg : []\n", "- ORACLE_READ_PARTITION_COLUMNS: Dictionary of custom read partition columns, e.g.: {'table2': 'secondary_id'}\n", "- SPANNER_INSTANCE: Cloud Spanner instance name\n", "- SPANNER_DATABASE: Cloud Spanner database name\n", "- SPANNER_TABLE_PRIMARY_KEYS: Dictionary of format {\"table_name\": \"primary_key_column1,primary_key_column2\"} for tables which do not have primary key in Oracle\n", "- SPANNER_OUTPUT_MODE: <Append | Overwrite>" ] }, { "cell_type": "code", "execution_count": null, "id": "71dd2824-e9a0-4ceb-a3c9-32f79973432a", "metadata": { "tags": [] }, "outputs": [], "source": [ "ORACLE_HOST = \"<host-ip>\"\n", "ORACLE_PORT = \"<port>\"\n", "ORACLE_USERNAME = \"<username>\"\n", "ORACLE_PASSWORD = \"<password>\"\n", "ORACLE_DATABASE = \"<database>\"\n", "ORACLE_SCHEMA = \"\" # Leave empty to default to ORACLE_USERNAME\n", "ORACLE_TABLE_LIST = [] # Leave list empty for migrating complete database else provide tables as ['table1','table2']\n", "ORACLE_READ_PARTITION_COLUMNS = {} # Leave empty for default read partition columns\n", "\n", "SPANNER_OUTPUT_MODE = \"<Overwrite | Append>\"\n", "SPANNER_INSTANCE = \"<spanner-instance>\"\n", "SPANNER_DATABASE = \"<spanner-database>\"\n", "SPANNER_TABLE_PRIMARY_KEYS = {} # Provide tables which do not have PK in Oracle {\"table_name\":\"primary_key_column1,primary_key_column2\"}" ] }, { "attachments": {}, "cell_type": "markdown", "id": "166b1536-d58e-423b-b3c2-cc0c171d275e", "metadata": {}, "source": [ "### Step 3.3 Notebook Configuration Parameters\n", "Below variables should not be changed unless required\n", "- ORACLE_URL: Oracle Python URL\n", "- JDBC_DRIVER: JDBC driver class\n", "- JDBC_URL: Oracle JDBC URL\n", "- JDBC_FETCH_SIZE: Determines how many rows to fetch per round trip\n", "- JDBC_SESSION_INIT_STATEMENT: Custom SQL statement to execute in each reader database session\n", "- MAIN_CLASS: Dataproc Template Main Class\n", "- WORKING_DIRECTORY: Java working directory\n", "- PACKAGE_EGG_FILE: Dataproc Template distributio file\n", "- PIPELINE_ROOT: Path to Vertex AI pipeline artifacts" ] }, { "cell_type": "code", "execution_count": null, "id": "bcb42b2f", "metadata": {}, "outputs": [], "source": [ "cur_path = Path(os.getcwd())\n", "WORKING_DIRECTORY = os.path.join(cur_path.parent.parent, 'java')\n", "\n", "# If the above code doesn't fetch the correct path please\n", "# provide complete path to python folder in your dataproc \n", "# template repo which you cloned.\n", "\n", "# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/java/\"\n", "print(WORKING_DIRECTORY)" ] }, { "cell_type": "code", "execution_count": null, "id": "c6f0f037-e888-4479-a143-f06a39bd5cc1", "metadata": { "tags": [] }, "outputs": [], "source": [ "ORACLE_URL = \"oracle://{}:{}@{}:{}?service_name={}\".format(ORACLE_USERNAME, ORACLE_PASSWORD, ORACLE_HOST, ORACLE_PORT, ORACLE_DATABASE)\n", "JDBC_DRIVER = \"oracle.jdbc.OracleDriver\"\n", "JDBC_URL = \"jdbc:oracle:thin:{}/{}@{}:{}/{}\".format(ORACLE_USERNAME, ORACLE_PASSWORD, ORACLE_HOST, ORACLE_PORT, ORACLE_DATABASE)\n", "JDBC_FETCH_SIZE = 200\n", "JDBC_SESSION_INIT_STATEMENT = \"BEGIN DBMS_APPLICATION_INFO.SET_MODULE('Dataproc Templates','OracleToSpanner Notebook'); END;\"\n", "MAIN_CLASS = \"com.google.cloud.dataproc.templates.main.DataProcTemplate\"\n", "JAR_FILE = \"dataproc-templates-1.0-SNAPSHOT.jar\"\n", "LOG4J_PROPERTIES_PATH = \"./src/test/resources\"\n", "LOG4J_PROPERTIES = \"log4j-spark-driver-template.properties\"\n", "PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n", "\n", "# Adding Dataproc template JAR\n", "JARS.append(GCS_STAGING_LOCATION + \"/\" + JAR_FILE)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "115c062b-5a91-4372-b440-5c37a12fbf87", "metadata": {}, "source": [ "## Step 4: Generate Oracle Table List\n", "This step creates list of tables for migration. If ORACLE_TABLE_LIST is kept empty all the tables in the ORACLE_DATABASE are listed for migration otherwise the provided list is used" ] }, { "cell_type": "code", "execution_count": null, "id": "d0e362ac-30cd-4857-9e2a-0e9eb926e627", "metadata": { "tags": [] }, "outputs": [], "source": [ "input_mgr = JDBCInputManager.create(\"oracle\", sqlalchemy.create_engine(ORACLE_URL))\n", "\n", "# Retrieve list of tables from database.\n", "ORACLE_TABLE_LIST = input_mgr.build_table_list(schema_filter=ORACLE_SCHEMA, table_filter=ORACLE_TABLE_LIST)\n", "ORACLE_SCHEMA = input_mgr.get_schema()\n", "print(f\"Total tables to migrate from schema {ORACLE_SCHEMA}:\", len(ORACLE_TABLE_LIST))\n", "\n", "print(\"List of tables for migration:\")\n", "print(ORACLE_TABLE_LIST)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "1d9a62e8-7499-41c6-b32b-73b539b0c7c4", "metadata": {}, "source": [ "## Step 5: Get Primary Keys for Tables Not Present in SPANNER_TABLE_PRIMARY_KEYS\n", "For tables which do not have primary key provided in dictionary SPANNER_TABLE_PRIMARY_KEYS this step fetches primary key from ORACLE_DATABASE" ] }, { "cell_type": "code", "execution_count": null, "id": "6eda8fac-582c-4d4a-b871-311bb2863335", "metadata": { "tags": [] }, "outputs": [], "source": [ "for table_name, pk_columns in input_mgr.get_primary_keys().items():\n", " notebook_functions.update_spanner_primary_keys(SPANNER_TABLE_PRIMARY_KEYS, table_name, pk_columns)\n", "\n", "notebook_functions.remove_unexpected_spanner_primary_keys(SPANNER_TABLE_PRIMARY_KEYS, ORACLE_TABLE_LIST)" ] }, { "cell_type": "code", "execution_count": null, "id": "7c2a210f-48da-474f-bf46-89e755d01c67", "metadata": { "tags": [] }, "outputs": [], "source": [ "pkDF = pd.DataFrame({\"table\" : ORACLE_TABLE_LIST,\n", " \"primary_keys\": [SPANNER_TABLE_PRIMARY_KEYS.get(_) for _ in ORACLE_TABLE_LIST]})\n", "print(\"Below are identified primary keys for migrating Oracle table to Spanner:\")\n", "pkDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "02748c28-54e9-466c-9537-c00569122a96", "metadata": {}, "source": [ "# Step 6: Identify Read Partition Columns\n", "This step uses PARTITION_THRESHOLD (default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be used for partitioned read based on Primary Keys\n", " - PARTITION_OPTIONS: List will have table and its partitioned column and Spark SQL settings if exceeds the threshold" ] }, { "cell_type": "code", "execution_count": null, "id": "b4e4f6e9-c559-48e8-95fd-d2dd3e173439", "metadata": {}, "outputs": [], "source": [ "PARTITION_THRESHOLD = 1000000\n", "PARTITION_OPTIONS = input_mgr.define_read_partitioning(\n", " PARTITION_THRESHOLD, custom_partition_columns=ORACLE_READ_PARTITION_COLUMNS\n", ")\n", "input_mgr.read_partitioning_df(PARTITION_OPTIONS)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "1fa5f841-a687-4723-a8e6-6e7e752ba36e", "metadata": {}, "source": [ "## Step 7: Download JAR files and Upload to Cloud Storage (only required to run one-time)\n", "#### Run Step 7 one time for each new notebook instance" ] }, { "cell_type": "code", "execution_count": null, "id": "22220ae3-9fb4-471c-b5aa-f606deeca15e", "metadata": { "tags": [] }, "outputs": [], "source": [ "%cd $WORKING_DIRECTORY" ] }, { "cell_type": "markdown", "id": "bdee7afc-699b-4c1a-aeec-df0f99764ae0", "metadata": {}, "source": [ "#### Setting PATH variables for JDK and Maven and executing MAVEN build" ] }, { "cell_type": "code", "execution_count": null, "id": "4b40f634-1983-4267-a4c1-b072bf6d81ae", "metadata": { "tags": [] }, "outputs": [], "source": [ "OJDBC_PATH = os.path.splitext(OJDBC_JAR)[0].replace(\"-\", \"/\")\n", "!wget --no-verbose https://repo1.maven.org/maven2/com/oracle/database/jdbc/$OJDBC_PATH/$OJDBC_JAR\n", "!mvn -q clean spotless:apply install -DskipTests " ] }, { "cell_type": "markdown", "id": "9e1a779f-2c39-42ec-98be-0f5e9d715447", "metadata": {}, "source": [ "#### Copying JARS files to GCS_STAGING_LOCATION" ] }, { "cell_type": "code", "execution_count": null, "id": "939cdcd5-0f3e-4f51-aa78-93d1976cb0f4", "metadata": { "tags": [] }, "outputs": [], "source": [ "!gsutil cp target/$JAR_FILE $GCS_STAGING_LOCATION/$JAR_FILE\n", "!gsutil cp $LOG4J_PROPERTIES_PATH/$LOG4J_PROPERTIES $GCS_STAGING_LOCATION/$LOG4J_PROPERTIES\n", "!gsutil cp $OJDBC_JAR $GCS_STAGING_LOCATION/jars/$OJDBC_JAR" ] }, { "cell_type": "markdown", "id": "0d9bb170-09c4-40d1-baaf-9e907f215889", "metadata": {}, "source": [ "## Step 8: Calculate Parallel Jobs for Oracle to Cloud Spanner\n", "This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run" ] }, { "cell_type": "code", "execution_count": null, "id": "2c501db0-c1fb-4a05-88b8-a7e546e2b1d0", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Calculate parallel jobs:\n", "JOB_LIST = notebook_functions.split_list(input_mgr.get_table_list(), MAX_PARALLELISM)\n", "print(\"List of tables for execution:\")\n", "print(JOB_LIST)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "78f6f83b-891a-4515-a1d6-f3406a25dc2a", "metadata": {}, "source": [ "## Step 9: Execute Pipeline to Migrate Tables from Oracle to Cloud Spanner" ] }, { "cell_type": "code", "execution_count": null, "id": "1f98914b-bd74-4d0e-9562-7019d504a25e", "metadata": { "tags": [] }, "outputs": [], "source": [ "oracle_to_spanner_jobs = []" ] }, { "cell_type": "code", "execution_count": null, "id": "863fa2d8-4ef7-4722-87c8-eec6c06f892b", "metadata": { "tags": [] }, "outputs": [], "source": [ "def migrate_oracle_to_spanner(EXECUTION_LIST):\n", " EXECUTION_LIST = EXECUTION_LIST\n", " aiplatform.init(project=PROJECT,staging_bucket=GCS_STAGING_LOCATION)\n", " \n", " @dsl.pipeline(\n", " name=\"java-oracle-to-spanner-spark\",\n", " description=\"Pipeline to get data from Oracle to Cloud Spanner\",\n", " )\n", " def pipeline(\n", " PROJECT_ID: str = PROJECT,\n", " LOCATION: str = REGION,\n", " MAIN_CLASS: str = MAIN_CLASS,\n", " JAR_FILE_URIS: list = JARS,\n", " SUBNETWORK_URI: str = SUBNET,\n", " SERVICE_ACCOUNT: str = SERVICE_ACCOUNT,\n", " FILE_URIS: list = [GCS_STAGING_LOCATION + \"/\" + LOG4J_PROPERTIES]\n", " ):\n", " for table in EXECUTION_LIST:\n", " BATCH_ID = \"ora2spanner-{}-{}\".format(table, datetime.now().strftime(\"%s\")).replace('_', '-').lower()\n", " oracle_to_spanner_jobs.append(BATCH_ID)\n", " if table in PARTITION_OPTIONS.keys():\n", " partition_options = PARTITION_OPTIONS[table]\n", " TEMPLATE_SPARK_ARGS = [\n", " \"--template=JDBCTOSPANNER\",\n", " \"--templateProperty\", \"project.id={}\".format(PROJECT),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.url={}\".format(JDBC_URL),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.driver.class.name={}\".format(JDBC_DRIVER),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.fetchsize={}\".format(JDBC_FETCH_SIZE),\n", " \"--templateProperty\", \"jdbctospanner.sql=select * from {}.{}\".format(ORACLE_SCHEMA, table),\n", " \"--templateProperty\", \"jdbctospanner.output.instance={}\".format(SPANNER_INSTANCE),\n", " \"--templateProperty\", \"jdbctospanner.output.database={}\".format(SPANNER_DATABASE),\n", " \"--templateProperty\", \"jdbctospanner.output.table={}\".format(table),\n", " \"--templateProperty\", \"jdbctospanner.output.saveMode={}\".format(SPANNER_OUTPUT_MODE),\n", " \"--templateProperty\", \"jdbctospanner.output.primaryKey={}\".format(SPANNER_TABLE_PRIMARY_KEYS[table]),\n", " \"--templateProperty\", \"jdbctospanner.output.batchInsertSize=200\",\n", " \"--templateProperty\", \"jdbctospanner.sql.partitionColumn={}\".format(partition_options[jdbc_input_manager_interface.SPARK_PARTITION_COLUMN]),\n", " \"--templateProperty\", \"jdbctospanner.sql.lowerBound={}\".format(partition_options[jdbc_input_manager_interface.SPARK_LOWER_BOUND]),\n", " \"--templateProperty\", \"jdbctospanner.sql.upperBound={}\".format(partition_options[jdbc_input_manager_interface.SPARK_UPPER_BOUND]),\n", " \"--templateProperty\", \"jdbctospanner.sql.numPartitions={}\".format(partition_options[jdbc_input_manager_interface.SPARK_NUM_PARTITIONS]),\n", " ]\n", " else:\n", " TEMPLATE_SPARK_ARGS = [\n", " \"--template=JDBCTOSPANNER\",\n", " \"--templateProperty\", \"project.id={}\".format(PROJECT),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.url={}\".format(JDBC_URL),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.driver.class.name={}\".format(JDBC_DRIVER),\n", " \"--templateProperty\", \"jdbctospanner.jdbc.fetchsize={}\".format(JDBC_FETCH_SIZE),\n", " \"--templateProperty\", \"jdbctospanner.sql=select * from {}.{}\".format(ORACLE_SCHEMA, table),\n", " \"--templateProperty\", \"jdbctospanner.output.instance={}\".format(SPANNER_INSTANCE),\n", " \"--templateProperty\", \"jdbctospanner.output.database={}\".format(SPANNER_DATABASE),\n", " \"--templateProperty\", \"jdbctospanner.output.table={}\".format(table),\n", " \"--templateProperty\", \"jdbctospanner.output.saveMode={}\".format(SPANNER_OUTPUT_MODE),\n", " \"--templateProperty\", \"jdbctospanner.output.primaryKey={}\".format(SPANNER_TABLE_PRIMARY_KEYS[table]),\n", " \"--templateProperty\", \"jdbctospanner.output.batchInsertSize=200\",\n", " ]\n", " if JDBC_SESSION_INIT_STATEMENT:\n", " TEMPLATE_SPARK_ARGS.append(\"--jdbctospanner.input.sessioninitstatement={}\".format(JDBC_SESSION_INIT_STATEMENT))\n", " \n", " _ = DataprocSparkBatchOp(\n", " project=PROJECT_ID,\n", " location=LOCATION,\n", " batch_id=BATCH_ID,\n", " main_class=MAIN_CLASS,\n", " jar_file_uris=JAR_FILE_URIS,\n", " file_uris=FILE_URIS,\n", " subnetwork_uri=SUBNETWORK_URI,\n", " service_account=SERVICE_ACCOUNT,\n", " runtime_config_version=\"1.1\", # issue 665\n", " args=TEMPLATE_SPARK_ARGS\n", " )\n", " time.sleep(3)\n", "\n", " compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n", "\n", " pipeline = aiplatform.PipelineJob(\n", " display_name=\"pipeline\",\n", " template_path=\"pipeline.json\",\n", " pipeline_root=PIPELINE_ROOT,\n", " enable_caching=False,\n", " )\n", " # run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using\n", " # specific service account instead of default service account \n", " # eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')\n", " pipeline.run()" ] }, { "cell_type": "code", "execution_count": null, "id": "44205b54-1ac7-42f3-85ad-5b20f531056b", "metadata": { "tags": [] }, "outputs": [], "source": [ "for execution_list in JOB_LIST:\n", " print(execution_list)\n", " migrate_oracle_to_spanner(execution_list)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "9ce7f828-dacc-404b-8927-dc3813e7216a", "metadata": {}, "source": [ "## Step 10: Get Status for Tables Migrated from Oracle to Cloud Spanner" ] }, { "cell_type": "code", "execution_count": null, "id": "b611510f-271c-447a-899d-42fbb983268d", "metadata": { "tags": [] }, "outputs": [], "source": [ "def get_bearer_token():\n", " \n", " try:\n", " #Defining Scope\n", " CREDENTIAL_SCOPES = [\"https://www.googleapis.com/auth/cloud-platform\"]\n", "\n", " #Assining credentials and project value\n", " credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)\n", "\n", " #Refreshing credentials data\n", " credentials.refresh(requests.Request())\n", "\n", " #Get refreshed token\n", " token = credentials.token\n", " if token:\n", " return (token,200)\n", " else:\n", " return \"Bearer token not generated\"\n", " except Exception as error:\n", " return (\"Bearer token not generated. Error : {}\".format(error),500)" ] }, { "cell_type": "code", "execution_count": null, "id": "d1fcbc63-19db-42a8-a2ed-d9855da00c04", "metadata": { "tags": [] }, "outputs": [], "source": [ "from google.auth.transport import requests\n", "import google\n", "token = get_bearer_token()\n", "if token[1] == 200:\n", " print(\"Bearer token generated\")\n", "else:\n", " print(token)" ] }, { "cell_type": "code", "execution_count": null, "id": "5be3cf87-6d28-4b23-8466-87d3399f7a29", "metadata": { "tags": [] }, "outputs": [], "source": [ "import requests\n", "\n", "oracle_to_spanner_status = []\n", "job_status_url = \"https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}\"\n", "for job in oracle_to_spanner_jobs:\n", " auth = \"Bearer \" + token[0]\n", " url = job_status_url.format(PROJECT,REGION,job)\n", " headers = {\n", " 'Content-Type': 'application/json; charset=UTF-8',\n", " 'Authorization': auth \n", " }\n", " response = requests.get(url, headers=headers)\n", " oracle_to_spanner_status.append(response.json()['state'])" ] }, { "cell_type": "code", "execution_count": null, "id": "1097575d-07c2-4659-a75f-d7e898e3f077", "metadata": { "tags": [] }, "outputs": [], "source": [ "statusDF = pd.DataFrame({\"table\": ORACLE_TABLE_LIST, \"oracle_to_spanner_job\": oracle_to_spanner_jobs, \"oracle_to_spanner_status\": oracle_to_spanner_status})\n", "statusDF" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0961f164-c7e4-4bb5-80f0-25fd1051147b", "metadata": {}, "source": [ "## Step 11: Validate Row Counts of Migrated Tables from Oracle to Cloud Spanner" ] }, { "cell_type": "code", "execution_count": null, "id": "25299344-c167-4764-a5d1-56c1b384d104", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Get Oracle table counts\n", "oracle_row_count = input_mgr.get_table_list_with_counts()" ] }, { "cell_type": "code", "execution_count": null, "id": "ab0e539d-5180-4f5b-915e-35f7ea45e0d3", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Get Cloud Spanner table counts\n", "from google.cloud import spanner\n", "\n", "spanner_client = spanner.Client()\n", "instance = spanner_client.instance(SPANNER_INSTANCE)\n", "database = instance.database(SPANNER_DATABASE)\n", "\n", "spanner_row_count = []\n", "for table in ORACLE_TABLE_LIST:\n", " with database.snapshot() as snapshot:\n", " results = snapshot.execute_sql(\"select count(*) from {}\".format(table))\n", " for row in results:\n", " spanner_row_count.append(row[0])" ] }, { "cell_type": "code", "execution_count": null, "id": "4b1afe12-3eb9-4133-8377-66dc63ac649c", "metadata": { "tags": [] }, "outputs": [], "source": [ "statusDF['oracle_row_count'] = oracle_row_count \n", "statusDF['spanner_row_count'] = spanner_row_count \n", "statusDF" ] } ], "metadata": { "environment": { "kernel": "python3", "name": "common-cpu.m97", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/base-cpu:m97" }, "kernelspec": { "display_name": "Python 3 (ipykernel) (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.9" }, "vscode": { "interpreter": { "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" } } }, "nbformat": 4, "nbformat_minor": 5 }