notebooks/oracle2postgres/OracleToPostgres_notebook.ipynb (978 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "98acd907",
"metadata": {},
"outputs": [],
"source": [
"# Copyright 2022 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1db5371a-8f16-47b7-bcc7-5af386e9b6d8",
"metadata": {},
"source": [
"# <center>Oracle to PostgreSQL\n",
"<table align=\"left\">\n",
"<td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/oracle2postgres/OracleToPostgres_notebook.ipynb\">\n",
" <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Foracle2postgres%2FOracleToPostgres_notebook.ipynb\">\n",
" <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/oracle2postgres/OracleToPostgres_notebook.ipynb\">\n",
" <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/oracle2postgres/OracleToPostgres_notebook.ipynb\">\n",
" <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n",
" </a>\n",
"</td>\n",
"</table>"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "dd944742",
"metadata": {},
"source": [
"#### References\n",
"\n",
"- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)\n",
"- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n",
"- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n",
"- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n",
"\n",
"#### Overview - Oracle to PostgreSQL Migration\n",
"\n",
"This notebook helps with the step by step process of migrating Oracle database tables to PostgreSQL using Dataproc template.\n",
"\n",
"#### Permissions\n",
"\n",
"This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. \n",
"Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. If using custom service account, service account attached to Vertex AI notebook should have Service Account User role to use custom role in job.\n",
"\n",
"Make sure that the service account used to run the notebook has the following roles:\n",
"\n",
"- roles/aiplatform.serviceAgent\n",
"- roles/aiplatform.customCodeServiceAgent\n",
"- roles/storage.objectCreator\n",
"- roles/storage.objectViewer\n",
"- roles/dataproc.editor\n",
"- roles/dataproc.worker\n",
"- roles/bigquery.dataEditor\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "740926fd-d602-4420-a362-d0646b5e11ce",
"metadata": {},
"source": [
"## Contact\n",
"\n",
"Share you feedback, ideas, thoughts [feedback-form](https://forms.gle/XXCJeWeCJJ9fNLQS6) \n",
"Questions, issues, and comments should be directed to dataproc-templates-support-external@googlegroups.com\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7d89b301-5249-462a-97d8-986488b303fd",
"metadata": {},
"source": [
"## Step 1: Install Libraries\n",
"\n",
"#### Run Step 1 one time for each new notebook instance\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fef65ec2-ad6b-407f-a993-7cdf871bba11",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!pip3 install SQLAlchemy\n",
"!pip3 install --upgrade google-cloud-pipeline-components kfp --user -q\n",
"!sudo apt-get install libpq-dev --yes\n",
"!pip3 install psycopg2\n",
"!pip3 install cx-Oracle"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "2fe7de03-566e-4902-97e9-eb9c9c4a8d8f",
"metadata": {},
"source": [
"#### Oracle Client Installation\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "82bc9e6c-408e-4e87-9e44-82dff4f97969",
"metadata": {
"scrolled": true,
"tags": []
},
"outputs": [],
"source": [
"%%bash\n",
"sudo mkdir -p /opt/oracle\n",
"sudo rm -fr /opt/oracle/instantclient*\n",
"cd /opt/oracle\n",
"sudo wget --no-verbose https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip\n",
"sudo unzip instantclient-basic-linuxx64.zip\n",
"INSTANT_CLIENT_DIR=$(find /opt/oracle -maxdepth 1 -type d -name \"instantclient_[0-9]*_[0-9]*\" | sort | tail -1)\n",
"test -n \"${INSTANT_CLIENT_DIR}\" || echo \"ERROR: Could not find instant client\"\n",
"test -n \"${INSTANT_CLIENT_DIR}\" || exit 1\n",
"sudo apt-get install libaio1\n",
"sudo sh -c \"echo ${INSTANT_CLIENT_DIR} > /etc/ld.so.conf.d/oracle-instantclient.conf\"\n",
"sudo ldconfig\n",
"export LD_LIBRARY_PATH=${INSTANT_CLIENT_DIR}:$LD_LIBRARY_PATH"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "31456ac6-d058-4d90-8b18-c10f92922f81",
"metadata": {},
"source": [
"#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.\n",
"\n",
"Uncomment & Run this cell if you have installed anything from above commands\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "19ef1307-902e-4713-8948-b86084e19312",
"metadata": {},
"outputs": [],
"source": [
"# import os\n",
"# import IPython\n",
"# if not os.getenv(\"IS_TESTING\"):\n",
"# app = IPython.Application.instance()\n",
"# app.kernel.do_shutdown(True)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "70d01e33-9099-4d2e-b57e-575c3a998d84",
"metadata": {},
"source": [
"## Step 2: Import Libraries\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2703b502-1b41-44f1-bf21-41069255bc32",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from datetime import datetime\n",
"import os\n",
"from pathlib import Path\n",
"import sys\n",
"import time\n",
"\n",
"import google.cloud.aiplatform as aiplatform\n",
"from kfp import dsl\n",
"from kfp import compiler\n",
"\n",
"try:\n",
" from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp\n",
"except ModuleNotFoundError:\n",
" from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp\n",
" \n",
"import pandas as pd\n",
"import sqlalchemy\n",
"\n",
"module_path = os.path.abspath(os.pardir)\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)\n",
"\n",
"from util.jdbc.jdbc_input_manager import JDBCInputManager\n",
"from util.jdbc import jdbc_input_manager_interface\n",
"from util import notebook_functions"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "09c4a209-db59-42f6-bba7-30cd46b16bad",
"metadata": {},
"source": [
"## Step 3: Assign Parameters\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "92d3fbd8-013f-45e6-b7e9-8f31a4580e91",
"metadata": {},
"source": [
"### Step 3.1 Common Parameters\n",
"\n",
"- PROJECT : GCP project-id\n",
"- REGION : GCP region\n",
"- GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts\n",
"- SUBNET : VPC subnet\n",
"- JARS : List of jars. For this notebook PostgreSQL connector jar and Oracle JDBC driver are required in addition with the Dataproc template jars\n",
"- MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 5\n",
"- SERVICE_ACCOUNT : Custom service account email to use for vertex ai pipeline and dataproc job with above mentioned permissions\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "160b0650",
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"IS_PARAMETERIZED = False\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1b9f42b4-4fc7-48de-8689-bc0f09d155f9",
"metadata": {},
"outputs": [],
"source": [
"if not IS_PARAMETERIZED:\n",
" PROJECT = \"<project-id>\"\n",
" REGION = \"<region>\"\n",
" GCS_STAGING_LOCATION = \"<gs://bucket/[folder]>\"\n",
" SUBNET = \"projects/{project}/regions/{region}/subnetworks/{subnet}\"\n",
" MAX_PARALLELISM = 5\n",
" SERVICE_ACCOUNT = \"\"\n",
"\n",
"OJDBC_JAR = \"ojdbc8-21.7.0.0.jar\" # For Oracle 11g use ojdbc6-11.2.0.4.jar\n",
"# Do not change this parameter unless you want to refer below JARS from new location\n",
"JARS = [\n",
" f\"{GCS_STAGING_LOCATION}/jars/{OJDBC_JAR}\",\n",
" GCS_STAGING_LOCATION + \"/jars/postgresql-42.2.6.jar\",\n",
"]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4a87c4e3-a4bd-44bd-8120-99097383bcec",
"metadata": {},
"outputs": [],
"source": [
"# If SERVICE_ACCOUNT is not specified it will take the one attached to Notebook\n",
"if SERVICE_ACCOUNT == '':\n",
" shell_output = !gcloud auth list 2>/dev/null\n",
" SERVICE_ACCOUNT = shell_output[2].replace(\"*\", \"\").strip()\n",
" print(\"Service Account: \",SERVICE_ACCOUNT)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "051df2af-bd8b-47c7-8cb2-05404ca0d859",
"metadata": {},
"source": [
"### Step 3.2 Oracle to PostgreSQL Parameters\n",
"\n",
"- ORACLE_HOST: Oracle instance ip address\n",
"- ORACLE_PORT: Oracle instance port\n",
"- ORACLE_USERNAME: Oracle username\n",
"- ORACLE_PASSWORD: Oracle password\n",
"- ORACLE_DATABASE: Name of database/service for Oracle connection\n",
"- ORACLE_SCHEMA: Schema to be exported, leave blank to export tables owned by ORACLE_USERNAME\n",
"- ORACLE_TABLE_LIST: List of tables to migrate e.g.: ['table1', 'table2'] else provide an empty list for migration whole database e.g.: [] \n",
"- ORACLE_READ_PARTITION_COLUMNS: Dictionary of custom read partition columns, e.g.: {'table2': 'secondary_id'}"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a4c61a12-7f39-41df-ace8-5d5c573680f1",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"if not IS_PARAMETERIZED:\n",
" ORACLE_HOST = \"\"\n",
" ORACLE_PORT = \"1521\"\n",
" ORACLE_USERNAME = \"\"\n",
" ORACLE_PASSWORD = \"\"\n",
" ORACLE_DATABASE = \"\"\n",
" ORACLE_SCHEMA = \"\" # Leave empty to default to ORACLE_USERNAME\n",
" ORACLE_TABLE_LIST = (\n",
" []\n",
" ) # Leave list empty for migrating complete database else provide tables as ['table1', 'table2']\n",
" ORACLE_READ_PARTITION_COLUMNS = {} # Leave empty to default read partition columns\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "17131137-9cd1-41a0-a942-52c3fa9f34e6",
"metadata": {},
"source": [
"### Step 3.3 PostgreSQL Parameters\n",
"\n",
"- POSTGRES_HOST: POSTGRES instance ip address\n",
"- POSTGRES_PORT: POSTGRES instance port\n",
"- POSTGRES_USERNAME: POSTGRES username\n",
"- POSTGRES_PASSWORD: POSTGRES password\n",
"- POSTGRES_DATABASE: name of database that you want to migrate\n",
"- POSTGRES_SCHEMA: name of schema that you want to migrate\n",
"- JDBCTOJDBC_OUTPUT_MODE: Output write mode (one of: append,overwrite,ignore,errorifexists)(Defaults to overwrite)\n",
"- JDBCTOJDBC_BATCH_SIZE: JDBC output batch size. Default set to 1000"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "77dd8c35-8c55-40e2-ae5b-9a7e6a13bf50",
"metadata": {},
"outputs": [],
"source": [
"if not IS_PARAMETERIZED:\n",
" POSTGRES_HOST = \"<host>\"\n",
" POSTGRES_PORT = \"<port>\"\n",
" POSTGRES_USERNAME = \"<username>\"\n",
" POSTGRES_PASSWORD = \"<password>\"\n",
" POSTGRES_DATABASE = \"<database>\"\n",
" POSTGRES_SCHEMA = \"<schema>\"\n",
" JDBCTOJDBC_OUTPUT_MODE = (\n",
" \"<modeoverwrite>\" # one of append/overwrite/ignore/errorifexists\n",
" )\n",
" JDBCTOJDBC_OUTPUT_BATCH_SIZE = \"1000\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "166b1536-d58e-423b-b3c2-cc0c171d275e",
"metadata": {
"tags": []
},
"source": [
"### Step 3.4 Notebook Configuration Parameters\n",
"\n",
"Below variables should not be changed unless required\n",
"\n",
"- ORACLE_URL: Oracle Python URL\n",
"- JDBC_DRIVER: JDBC driver class\n",
"- JDBC_URL: Oracle JDBC URL\n",
"- JDBC_FETCH_SIZE: Determines how many rows to fetch per round trip\n",
"- JDBC_SESSION_INIT_STATEMENT: Custom SQL statement to execute in each reader database session\n",
"- MAIN_CLASS: Dataproc Template Main Class\n",
"- WORKING_DIRECTORY: Python working directory\n",
"- PACKAGE_EGG_FILE: Dataproc Template distributio file\n",
"- PIPELINE_ROOT: Path to Vertex AI pipeline artifacts"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "377a2cd7",
"metadata": {},
"outputs": [],
"source": [
"cur_path = Path(os.getcwd())\n",
"if IS_PARAMETERIZED:\n",
" WORKING_DIRECTORY = os.path.join(cur_path.parent, \"python\")\n",
"else:\n",
" WORKING_DIRECTORY = os.path.join(cur_path.parent.parent, \"python\")\n",
"\n",
"# If the above code doesn't fetches the correct path please\n",
"# provide complete path to python folder in your dataproc\n",
"# template repo which you cloned\n",
"\n",
"# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/python/\"\n",
"print(WORKING_DIRECTORY)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c6f0f037-e888-4479-a143-f06a39bd5cc1",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"ORACLE_URL = \"oracle://{}:{}@{}:{}?service_name={}\".format(\n",
" ORACLE_USERNAME, ORACLE_PASSWORD, ORACLE_HOST, ORACLE_PORT, ORACLE_DATABASE\n",
")\n",
"JDBC_INPUT_DRIVER = \"oracle.jdbc.OracleDriver\"\n",
"JDBC_INPUT_URL = \"jdbc:oracle:thin:{}/{}@{}:{}/{}\".format(\n",
" ORACLE_USERNAME, ORACLE_PASSWORD, ORACLE_HOST, ORACLE_PORT, ORACLE_DATABASE\n",
")\n",
"MAIN_CLASS = \"com.google.cloud.dataproc.templates.main.DataProcTemplate\"\n",
"WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/python/\"\n",
"JDBC_OUTPUT_DRIVER = \"org.postgresql.Driver\"\n",
"JDBC_OUTPUT_URL = \"jdbc:postgresql://{0}:{1}/{2}?user={3}&password={4}&reWriteBatchedInserts=true\".format(\n",
" POSTGRES_HOST,\n",
" POSTGRES_PORT,\n",
" POSTGRES_DATABASE,\n",
" POSTGRES_USERNAME,\n",
" POSTGRES_PASSWORD,\n",
")\n",
"PACKAGE_EGG_FILE = \"dataproc_templates_distribution.egg\"\n",
"\n",
"JDBC_SESSION_INIT_STATEMENT = \"BEGIN DBMS_APPLICATION_INFO.SET_MODULE('Dataproc Templates','OracleToBigQuery Notebook'); END;\"\n",
"JDBC_FETCH_SIZE = 200\n",
"\n",
"PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n",
"MAIN_PYTHON_FILE = GCS_STAGING_LOCATION + \"/main.py\"\n",
"PYTHON_FILE_URIS = [GCS_STAGING_LOCATION + \"/dataproc_templates_distribution.egg\"]"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "115c062b-5a91-4372-b440-5c37a12fbf87",
"metadata": {},
"source": [
"## Step 4: Generate Oracle Table List\n",
"This step creates list of tables for migration. If ORACLE_TABLE_LIST is kept empty all the tables in the ORACLE_DATABASE are listed for migration otherwise the provided list is used\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d0e362ac-30cd-4857-9e2a-0e9eb926e627",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"input_mgr = JDBCInputManager.create(\"oracle\", sqlalchemy.create_engine(ORACLE_URL))\n",
"\n",
"# Retrieve list of tables from database.\n",
"ORACLE_TABLE_LIST = input_mgr.build_table_list(schema_filter=ORACLE_SCHEMA, table_filter=ORACLE_TABLE_LIST)\n",
"ORACLE_SCHEMA = input_mgr.get_schema()\n",
"print(f\"Total tables to migrate from schema {ORACLE_SCHEMA}:\", len(ORACLE_TABLE_LIST))\n",
"\n",
"print(\"List of tables for migration:\")\n",
"print(ORACLE_TABLE_LIST)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1d9a62e8-7499-41c6-b32b-73b539b0c7c4",
"metadata": {},
"source": [
"## Step 5: Identify Read Partition Columns\n",
"This step uses PARTITION_THRESHOLD (default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be used for partitioned read based on Primary Keys\n",
" - PARTITION_OPTIONS: List will have table and its partitioned column and Spark SQL settings if exceeds the threshold"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "93608d0f-ef97-4700-b360-cfb3444e88c4",
"metadata": {},
"outputs": [],
"source": [
"PARTITION_THRESHOLD = 1000000\n",
"PARTITION_OPTIONS = input_mgr.define_read_partitioning(\n",
" PARTITION_THRESHOLD, custom_partition_columns=ORACLE_READ_PARTITION_COLUMNS\n",
")\n",
"input_mgr.read_partitioning_df(PARTITION_OPTIONS)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1fa5f841-a687-4723-a8e6-6e7e752ba36e",
"metadata": {
"tags": []
},
"source": [
"## Step 6: Download JAR files and Upload to Cloud Storage (only required to run one-time)\n",
"#### Run Step 6 one time for each new notebook instance"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "22220ae3-9fb4-471c-b5aa-f606deeca15e",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%cd $WORKING_DIRECTORY"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "bdee7afc-699b-4c1a-aeec-df0f99764ae0",
"metadata": {},
"source": [
"#### Downloading JDBC Oracle Driver and Postgres Jar files\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4b40f634-1983-4267-a4c1-b072bf6d81ae",
"metadata": {
"scrolled": true,
"tags": []
},
"outputs": [],
"source": [
"OJDBC_PATH = os.path.splitext(OJDBC_JAR)[0].replace(\"-\", \"/\")\n",
"! wget --no-verbose https://repo1.maven.org/maven2/com/oracle/database/jdbc/$OJDBC_PATH/$OJDBC_JAR\n",
"! wget --no-verbose https://jdbc.postgresql.org/download/postgresql-42.2.6.jar"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "6e6d6813-36f0-45bc-a784-217842635138",
"metadata": {},
"source": [
"#### Build Dataproc Templates python package\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "06735281-1c22-48d3-9b71-20f9ca99d04a",
"metadata": {
"scrolled": true,
"tags": []
},
"outputs": [],
"source": [
"! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9e1a779f-2c39-42ec-98be-0f5e9d715447",
"metadata": {},
"source": [
"#### Copying JARS files to GCS_STAGING_LOCATION\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "939cdcd5-0f3e-4f51-aa78-93d1976cb0f4",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"! gsutil cp main.py $GCS_STAGING_LOCATION/\n",
"! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/\n",
"! gsutil cp $OJDBC_JAR $GCS_STAGING_LOCATION/jars/$OJDBC_JAR\n",
"! gsutil cp postgresql-42.2.6.jar $GCS_STAGING_LOCATION/jars/postgresql-42.2.6.jar"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "0d9bb170-09c4-40d1-baaf-9e907f215889",
"metadata": {},
"source": [
"## Step 7: Calculate Parallel Jobs for Oracle to PostgreSQL\n",
"\n",
"This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2c501db0-c1fb-4a05-88b8-a7e546e2b1d0",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Calculate parallel jobs:\n",
"JOB_LIST = notebook_functions.split_list(input_mgr.get_table_list(), MAX_PARALLELISM)\n",
"print(\"List of tables for execution:\")\n",
"print(JOB_LIST)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7e31a5e7-f8d4-4474-816f-13217dfa5fad",
"metadata": {},
"source": [
"## Step 8:Create Source Schema in PostgreSQL\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dac53ca9-75a1-48d7-82a3-9674a1fbdec0",
"metadata": {},
"outputs": [],
"source": [
"import psycopg2\n",
"\n",
"postgresDB = psycopg2.connect(\n",
" user=POSTGRES_USERNAME,\n",
" password=POSTGRES_PASSWORD,\n",
" dbname=POSTGRES_DATABASE,\n",
" host=POSTGRES_HOST,\n",
" port=POSTGRES_PORT,\n",
")\n",
"postgresDB.autocommit = True\n",
"conn = postgresDB.cursor()\n",
"conn.execute(\"\"\"CREATE SCHEMA IF NOT EXISTS {};\"\"\".format(POSTGRES_SCHEMA))\n",
"conn.close()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "63075f47-4d9f-4237-8833-aa29eebb09f4",
"metadata": {},
"source": [
"## Step 9: Execute Pipeline to Migrate tables from Oracle to PostgreSQL\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "22f7d00c-bd2a-4aaa-aee4-e41a9793dfb1",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"oracle_to_postgres_jobs = []\n",
"\n",
"def migrate_oracle_to_postgres(EXECUTION_LIST):\n",
" EXECUTION_LIST = EXECUTION_LIST\n",
" aiplatform.init(project=PROJECT, staging_bucket=GCS_STAGING_LOCATION)\n",
"\n",
" @dsl.pipeline(\n",
" name=\"python-oracle-to-postgres-pyspark\",\n",
" description=\"Pipeline to get data from Oracle to PostgreSQL\",\n",
" )\n",
" def pipeline(\n",
" PROJECT_ID: str = PROJECT,\n",
" LOCATION: str = REGION,\n",
" MAIN_PYTHON_CLASS: str = MAIN_PYTHON_FILE,\n",
" PYTHON_FILE_URIS: list = PYTHON_FILE_URIS,\n",
" JAR_FILE_URIS: list = JARS,\n",
" SUBNETWORK_URI: str = SUBNET,\n",
" ):\n",
" for table in EXECUTION_LIST:\n",
" BATCH_ID = \"oracle2postgres-{}\".format(datetime.now().strftime(\"%s\"))\n",
" oracle_to_postgres_jobs.append(BATCH_ID)\n",
"\n",
" if table in PARTITION_OPTIONS.keys():\n",
" partition_options = PARTITION_OPTIONS[table]\n",
" TEMPLATE_SPARK_ARGS = [\n",
" \"--template=JDBCTOJDBC\",\n",
" \"--jdbctojdbc.input.url={}\".format(JDBC_INPUT_URL),\n",
" \"--jdbctojdbc.input.driver={}\".format(JDBC_INPUT_DRIVER),\n",
" \"--jdbctojdbc.input.table={}.{}\".format(ORACLE_SCHEMA, table),\n",
" \"--jdbctojdbc.input.fetchsize={}\".format(JDBC_FETCH_SIZE),\n",
" \"--jdbctojdbc.output.url={}\".format(JDBC_OUTPUT_URL),\n",
" \"--jdbctojdbc.output.driver={}\".format(JDBC_OUTPUT_DRIVER),\n",
" \"--jdbctojdbc.output.table={}.{}\".format(POSTGRES_SCHEMA, table),\n",
" \"--jdbctojdbc.input.partitioncolumn={}\".format(partition_options[jdbc_input_manager_interface.SPARK_PARTITION_COLUMN]),\n",
" \"--jdbctojdbc.input.lowerbound={}\".format(partition_options[jdbc_input_manager_interface.SPARK_LOWER_BOUND]),\n",
" \"--jdbctojdbc.input.upperbound={}\".format(partition_options[jdbc_input_manager_interface.SPARK_UPPER_BOUND]),\n",
" \"--jdbctojdbc.numpartitions={}\".format(partition_options[jdbc_input_manager_interface.SPARK_NUM_PARTITIONS]),\n",
" \"--jdbctojdbc.output.mode={}\".format(JDBCTOJDBC_OUTPUT_MODE),\n",
" \"--jdbctojdbc.output.batch.size={}\".format(\n",
" JDBCTOJDBC_OUTPUT_BATCH_SIZE\n",
" ),\n",
" ]\n",
" else:\n",
" TEMPLATE_SPARK_ARGS = [\n",
" \"--template=JDBCTOJDBC\",\n",
" \"--jdbctojdbc.input.url={}\".format(JDBC_INPUT_URL),\n",
" \"--jdbctojdbc.input.driver={}\".format(JDBC_INPUT_DRIVER),\n",
" \"--jdbctojdbc.input.table={}.{}\".format(ORACLE_SCHEMA, table),\n",
" \"--jdbctojdbc.input.fetchsize={}\".format(JDBC_FETCH_SIZE),\n",
" \"--jdbctojdbc.output.url={}\".format(JDBC_OUTPUT_URL),\n",
" \"--jdbctojdbc.output.driver={}\".format(JDBC_OUTPUT_DRIVER),\n",
" \"--jdbctojdbc.output.table={}.{}\".format(POSTGRES_SCHEMA, table),\n",
" \"--jdbctojdbc.output.mode={}\".format(JDBCTOJDBC_OUTPUT_MODE),\n",
" \"--jdbctojdbc.output.batch.size={}\".format(\n",
" JDBCTOJDBC_OUTPUT_BATCH_SIZE\n",
" ),\n",
" ]\n",
" if JDBC_SESSION_INIT_STATEMENT:\n",
" TEMPLATE_SPARK_ARGS.append(\"--jdbc.bigquery.input.sessioninitstatement={}\".format(JDBC_SESSION_INIT_STATEMENT))\n",
"\n",
" _ = DataprocPySparkBatchOp(\n",
" project=PROJECT_ID,\n",
" location=LOCATION,\n",
" batch_id=BATCH_ID,\n",
" main_python_file_uri=MAIN_PYTHON_CLASS,\n",
" jar_file_uris=JAR_FILE_URIS,\n",
" python_file_uris=PYTHON_FILE_URIS,\n",
" subnetwork_uri=SUBNETWORK_URI,\n",
" service_account=SERVICE_ACCOUNT,\n",
" args=TEMPLATE_SPARK_ARGS,\n",
" )\n",
" time.sleep(3)\n",
"\n",
" compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n",
"\n",
" pipeline = aiplatform.PipelineJob(\n",
" display_name=\"pipeline\",\n",
" template_path=\"pipeline.json\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
" enable_caching=False,\n",
" )\n",
" pipeline.run(service_account=SERVICE_ACCOUNT)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "44205b54-1ac7-42f3-85ad-5b20f531056b",
"metadata": {
"scrolled": true,
"tags": []
},
"outputs": [],
"source": [
"for execution_list in JOB_LIST:\n",
" print(execution_list)\n",
" migrate_oracle_to_postgres(execution_list)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9ce7f828-dacc-404b-8927-dc3813e7216a",
"metadata": {},
"source": [
"## Step 10: Get status for tables migrated from Oracle to PostgreSQL\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b611510f-271c-447a-899d-42fbb983268d",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def get_bearer_token():\n",
" try:\n",
" # Defining Scope\n",
" CREDENTIAL_SCOPES = [\"https://www.googleapis.com/auth/cloud-platform\"]\n",
"\n",
" # Assining credentials and project value\n",
" credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)\n",
"\n",
" # Refreshing credentials data\n",
" credentials.refresh(requests.Request())\n",
"\n",
" # Get refreshed token\n",
" token = credentials.token\n",
" if token:\n",
" return (token, 200)\n",
" else:\n",
" return \"Bearer token not generated\"\n",
" except Exception as error:\n",
" return (\"Bearer token not generated. Error : {}\".format(error), 500)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d1fcbc63-19db-42a8-a2ed-d9855da00c04",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from google.auth.transport import requests\n",
"import google\n",
"\n",
"token = get_bearer_token()\n",
"if token[1] == 200:\n",
" print(\"Bearer token generated\")\n",
"else:\n",
" print(token)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5be3cf87-6d28-4b23-8466-87d3399f7a29",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import requests\n",
"\n",
"oracle_to_pg_status = []\n",
"job_status_url = (\n",
" \"https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}\"\n",
")\n",
"for job in oracle_to_postgres_jobs:\n",
" auth = \"Bearer \" + token[0]\n",
" url = job_status_url.format(PROJECT, REGION, job)\n",
" headers = {\"Content-Type\": \"application/json; charset=UTF-8\", \"Authorization\": auth}\n",
" response = requests.get(url, headers=headers)\n",
" oracle_to_pg_status.append(response.json()[\"state\"])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1097575d-07c2-4659-a75f-d7e898e3f077",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"statusDF = pd.DataFrame(\n",
" {\n",
" \"table\": ORACLE_TABLE_LIST,\n",
" \"oracle_to_postgres_job\": oracle_to_postgres_jobs,\n",
" \"oracle_to_postgres_status\": oracle_to_pg_status,\n",
" }\n",
")\n",
"statusDF"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "0961f164-c7e4-4bb5-80f0-25fd1051147b",
"metadata": {},
"source": [
"## Step 11: Validate row counts of migrated tables from Oracle to PostgreSQL\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "25299344-c167-4764-a5d1-56c1b384d104",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Get Oracle table counts\n",
"oracle_row_count = input_mgr.get_table_list_with_counts()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ab0e539d-5180-4f5b-915e-35f7ea45e0d3",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"postgres_row_count = []\n",
"conn=postgresDB.cursor()\n",
"for table in ORACLE_TABLE_LIST:\n",
" conn.execute('select count(*) from {}.{}'.format(POSTGRES_SCHEMA, table))\n",
" results = conn.fetchall()\n",
" for row in results:\n",
" postgres_row_count.append(row[0])\n",
"conn.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4b1afe12-3eb9-4133-8377-66dc63ac649c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"statusDF[\"oracle_row_count\"] = oracle_row_count\n",
"statusDF[\"postgres_row_count\"] = postgres_row_count\n",
"statusDF"
]
}
],
"metadata": {
"environment": {
"kernel": "python3",
"name": "common-cpu.m104",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m104"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel) (Local)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.9"
},
"vscode": {
"interpreter": {
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}