In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# <center>Oracle to Cloud Spanner
<table align="left">
<td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/oracle2spanner/OracleToSpanner_notebook.ipynb">
        <img src="../images/colab-logo-32px.png" alt="Colab logo" />Run in Colab
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Foracle2spanner%2FOracleToSpanner_notebook.ipynb">
        <img src="../images/colab-enterprise-logo-32px.png" alt="GCP Colab Enterprise logo" />Run in Colab Enterprise
    </a>
</td>
<td>
    <a href="https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/oracle2spanner/OracleToSpanner_notebook.ipynb">
        <img src="../images/github-logo-32px.png" alt="GitHub logo" />View on GitHub
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/oracle2spanner/OracleToSpanner_notebook.ipynb">
        <img src="../images/vertexai.jpg" alt="Vertex AI logo" />Open in Vertex AI Workbench
    </a>
</td>
</table>

#### References

- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)
- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)
- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)
- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)

#### Overview - Oracle to Cloud Spanner Migration
This notebook helps with the step by step process of migrating Oracle database tables to Cloud Spanner using Dataproc template. This notebook solution uses the [JDBCTOSPANNER](../../java/src/main/java/com/google/cloud/dataproc/templates/jdbc/README.md) Spark (Java) template. The migration will create the Spanner tables if not exist, and it will overwrite or append based on the write mode configured in the notebook.
##### Feedback
Share you feedback, ideas, thoughts [feedback-form](https://forms.gle/XXCJeWeCJJ9fNLQS6). 
Questions, issues, and comments should be directed to dataproc-templates-support-external@googlegroups.com

#### Permissions
This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account.  
Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. If using custom service account, service account attached to Vertex AI notebook should have Service Account User role to use custom role in job.

Make sure that the service account used to run the notebook has the following roles:

- roles/aiplatform.serviceAgent
- roles/aiplatform.customCodeServiceAgent
- roles/storage.objectCreator
- roles/storage.objectViewer
- roles/dataproc.editor
- roles/dataproc.worker
- roles/bigquery.dataEditor

## Step 1: Install Libraries
#### Run Step 1 one time for each new notebook instance

In [None]:
! pip3 install SQLAlchemy
! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q
! pip3 install cx-Oracle
! pip3 install google-cloud-spanner

In [None]:
!sudo apt-get update -y
!sudo apt-get install default-jdk -y
!sudo apt-get install maven -y

#### Oracle client Installation

In [None]:
%%bash
sudo mkdir -p /opt/oracle
sudo rm -fr /opt/oracle/instantclient*
cd /opt/oracle
sudo wget --no-verbose https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip
sudo unzip instantclient-basic-linuxx64.zip
INSTANT_CLIENT_DIR=$(find /opt/oracle -maxdepth 1 -type d -name "instantclient_[0-9]*_[0-9]*" | sort | tail -1)
test -n "${INSTANT_CLIENT_DIR}" || echo "ERROR: Could not find instant client"
test -n "${INSTANT_CLIENT_DIR}" || exit 1
sudo apt-get install libaio1
sudo sh -c "echo ${INSTANT_CLIENT_DIR} > /etc/ld.so.conf.d/oracle-instantclient.conf"
sudo ldconfig
export LD_LIBRARY_PATH=${INSTANT_CLIENT_DIR}:$LD_LIBRARY_PATH

#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.

Uncomment & run this cell if you have installed anything from above commands

In [None]:
# import os
# import IPython
# if not os.getenv("IS_TESTING"):
#     app = IPython.Application.instance()
#     app.kernel.do_shutdown(True)

## Step 2: Import Libraries

In [None]:
from datetime import datetime
import os
from pathlib import Path
import sys
import time

import google.cloud.aiplatform as aiplatform
from kfp import dsl
from kfp import compiler

try:
    from google_cloud_pipeline_components.experimental.dataproc import DataprocSparkBatchOp
except ModuleNotFoundError:
    from google_cloud_pipeline_components.v1.dataproc import DataprocSparkBatchOp
    
import pandas as pd
import sqlalchemy

module_path = os.path.abspath(os.pardir)
if module_path not in sys.path:
    sys.path.append(module_path)

from util.jdbc.jdbc_input_manager import JDBCInputManager
from util.jdbc import jdbc_input_manager_interface
from util import notebook_functions

## Step 3: Assign Parameters

### Step 3.1 Common Parameters
 
- PROJECT : GCP project-id
- REGION : GCP region
- GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts
- SUBNET : VPC subnet
- JARS : list of jars. For this notebook oracle connector is required in addition with the dataproc template jars
- MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 5
- SERVICE_ACCOUNT : Custom service account email to use for vertex ai pipeline and dataproc job with above mentioned permissions

In [None]:
PROJECT = "<project-id>"
REGION = "<region>"
GCS_STAGING_LOCATION = "<gs://bucket/[folder]>"
SUBNET = "<projects/{project}/regions/{region}/subnetworks/{subnet}>"
MAX_PARALLELISM = 5
SERVICE_ACCOUNT = "<Service-Account>"

OJDBC_JAR = "ojdbc8-21.7.0.0.jar" # For Oracle 11g use ojdbc6-11.2.0.4.jar
# Do not change this parameter unless you want to refer below JARS from new location
JARS = [f"{GCS_STAGING_LOCATION}/jars/{OJDBC_JAR}"]

In [None]:
# If SERVICE_ACCOUNT is not specified it will take the one attached to Notebook
if SERVICE_ACCOUNT == "":
    shell_output = !gcloud auth list 2>/dev/null
    SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
    print("Service Account: ",SERVICE_ACCOUNT)

### Step 3.2 Oracle to Cloud Spanner Parameters
- ORACLE_HOST: Oracle instance ip address
- ORACLE_PORT: Oracle instance port
- ORACLE_USERNAME: Oracle username
- ORACLE_PASSWORD: Oracle password
- ORACLE_DATABASE: Name of database/service for Oracle connection
- ORACLE_SCHEMA: Schema to be exported, leave blank to export tables owned by ORACLE_USERNAME
- ORACLE_TABLE_LIST: List of tables to migrate eg: ['table1', 'table2'] else provide an empty list for migration whole database eg : []
- ORACLE_READ_PARTITION_COLUMNS: Dictionary of custom read partition columns, e.g.: {'table2': 'secondary_id'}
- SPANNER_INSTANCE: Cloud Spanner instance name
- SPANNER_DATABASE: Cloud Spanner database name
- SPANNER_TABLE_PRIMARY_KEYS: Dictionary of format {"table_name": "primary_key_column1,primary_key_column2"} for tables which do not have primary key in Oracle
- SPANNER_OUTPUT_MODE: <Append | Overwrite>

In [None]:
ORACLE_HOST = "<host-ip>"
ORACLE_PORT = "<port>"
ORACLE_USERNAME = "<username>"
ORACLE_PASSWORD = "<password>"
ORACLE_DATABASE = "<database>"
ORACLE_SCHEMA = "" # Leave empty to default to ORACLE_USERNAME
ORACLE_TABLE_LIST = [] # Leave list empty for migrating complete database else provide tables as ['table1','table2']
ORACLE_READ_PARTITION_COLUMNS = {} # Leave empty for default read partition columns

SPANNER_OUTPUT_MODE = "<Overwrite | Append>"
SPANNER_INSTANCE = "<spanner-instance>"
SPANNER_DATABASE = "<spanner-database>"
SPANNER_TABLE_PRIMARY_KEYS = {} # Provide tables which do not have PK in Oracle {"table_name":"primary_key_column1,primary_key_column2"}

### Step 3.3 Notebook Configuration Parameters
Below variables should not be changed unless required
- ORACLE_URL: Oracle Python URL
- JDBC_DRIVER: JDBC driver class
- JDBC_URL: Oracle JDBC URL
- JDBC_FETCH_SIZE: Determines how many rows to fetch per round trip
- JDBC_SESSION_INIT_STATEMENT: Custom SQL statement to execute in each reader database session
- MAIN_CLASS: Dataproc Template Main Class
- WORKING_DIRECTORY: Java working directory
- PACKAGE_EGG_FILE: Dataproc Template distributio file
- PIPELINE_ROOT: Path to Vertex AI pipeline artifacts

In [None]:
cur_path = Path(os.getcwd())
WORKING_DIRECTORY = os.path.join(cur_path.parent.parent, 'java')

# If the above code doesn't fetch the correct path please
# provide complete path to python folder in your dataproc 
# template repo which you cloned.

# WORKING_DIRECTORY = "/home/jupyter/dataproc-templates/java/"
print(WORKING_DIRECTORY)

In [None]:
ORACLE_URL = "oracle://{}:{}@{}:{}?service_name={}".format(ORACLE_USERNAME, ORACLE_PASSWORD, ORACLE_HOST, ORACLE_PORT, ORACLE_DATABASE)
JDBC_DRIVER = "oracle.jdbc.OracleDriver"
JDBC_URL = "jdbc:oracle:thin:{}/{}@{}:{}/{}".format(ORACLE_USERNAME, ORACLE_PASSWORD, ORACLE_HOST, ORACLE_PORT, ORACLE_DATABASE)
JDBC_FETCH_SIZE = 200
JDBC_SESSION_INIT_STATEMENT = "BEGIN DBMS_APPLICATION_INFO.SET_MODULE('Dataproc Templates','OracleToSpanner Notebook'); END;"
MAIN_CLASS = "com.google.cloud.dataproc.templates.main.DataProcTemplate"
JAR_FILE = "dataproc-templates-1.0-SNAPSHOT.jar"
LOG4J_PROPERTIES_PATH = "./src/test/resources"
LOG4J_PROPERTIES = "log4j-spark-driver-template.properties"
PIPELINE_ROOT = GCS_STAGING_LOCATION + "/pipeline_root/dataproc_pyspark"

# Adding Dataproc template JAR
JARS.append(GCS_STAGING_LOCATION + "/" + JAR_FILE)

## Step 4: Generate Oracle Table List
This step creates list of tables for migration. If ORACLE_TABLE_LIST is kept empty all the tables in the ORACLE_DATABASE are listed for migration otherwise the provided list is used

In [None]:
input_mgr = JDBCInputManager.create("oracle", sqlalchemy.create_engine(ORACLE_URL))

# Retrieve list of tables from database.
ORACLE_TABLE_LIST = input_mgr.build_table_list(schema_filter=ORACLE_SCHEMA, table_filter=ORACLE_TABLE_LIST)
ORACLE_SCHEMA = input_mgr.get_schema()
print(f"Total tables to migrate from schema {ORACLE_SCHEMA}:", len(ORACLE_TABLE_LIST))

print("List of tables for migration:")
print(ORACLE_TABLE_LIST)

## Step 5: Get Primary Keys for Tables Not Present in SPANNER_TABLE_PRIMARY_KEYS
For tables which do not have primary key provided in dictionary SPANNER_TABLE_PRIMARY_KEYS this step fetches primary key from ORACLE_DATABASE

In [None]:
for table_name, pk_columns in input_mgr.get_primary_keys().items():
    notebook_functions.update_spanner_primary_keys(SPANNER_TABLE_PRIMARY_KEYS, table_name, pk_columns)

notebook_functions.remove_unexpected_spanner_primary_keys(SPANNER_TABLE_PRIMARY_KEYS, ORACLE_TABLE_LIST)

In [None]:
pkDF = pd.DataFrame({"table" : ORACLE_TABLE_LIST,
                     "primary_keys": [SPANNER_TABLE_PRIMARY_KEYS.get(_) for _ in ORACLE_TABLE_LIST]})
print("Below are identified primary keys for migrating Oracle table to Spanner:")
pkDF

# Step 6: Identify Read Partition Columns
This step uses PARTITION_THRESHOLD (default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be used for partitioned read based on Primary Keys
 - PARTITION_OPTIONS: List will have table and its partitioned column and Spark SQL settings if exceeds the threshold

In [None]:
PARTITION_THRESHOLD = 1000000
PARTITION_OPTIONS = input_mgr.define_read_partitioning(
    PARTITION_THRESHOLD, custom_partition_columns=ORACLE_READ_PARTITION_COLUMNS
)
input_mgr.read_partitioning_df(PARTITION_OPTIONS)

## Step 7: Download JAR files and Upload to Cloud Storage (only required to run one-time)
#### Run Step 7 one time for each new notebook instance

In [None]:
%cd $WORKING_DIRECTORY

#### Setting PATH variables for JDK and Maven and executing MAVEN build

In [None]:
OJDBC_PATH = os.path.splitext(OJDBC_JAR)[0].replace("-", "/")
!wget --no-verbose https://repo1.maven.org/maven2/com/oracle/database/jdbc/$OJDBC_PATH/$OJDBC_JAR
!mvn -q clean spotless:apply install -DskipTests 

#### Copying JARS files to GCS_STAGING_LOCATION

In [None]:
!gsutil cp target/$JAR_FILE $GCS_STAGING_LOCATION/$JAR_FILE
!gsutil cp $LOG4J_PROPERTIES_PATH/$LOG4J_PROPERTIES $GCS_STAGING_LOCATION/$LOG4J_PROPERTIES
!gsutil cp $OJDBC_JAR $GCS_STAGING_LOCATION/jars/$OJDBC_JAR

## Step 8: Calculate Parallel Jobs for Oracle to Cloud Spanner
This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run

In [None]:
# Calculate parallel jobs:
JOB_LIST = notebook_functions.split_list(input_mgr.get_table_list(), MAX_PARALLELISM)
print("List of tables for execution:")
print(JOB_LIST)

## Step 9: Execute Pipeline to Migrate Tables from Oracle to Cloud Spanner

In [None]:
oracle_to_spanner_jobs = []

In [None]:
def migrate_oracle_to_spanner(EXECUTION_LIST):
    EXECUTION_LIST = EXECUTION_LIST
    aiplatform.init(project=PROJECT,staging_bucket=GCS_STAGING_LOCATION)
    
    @dsl.pipeline(
        name="java-oracle-to-spanner-spark",
        description="Pipeline to get data from Oracle to Cloud Spanner",
    )
    def pipeline(
        PROJECT_ID: str = PROJECT,
        LOCATION: str = REGION,
        MAIN_CLASS: str = MAIN_CLASS,
        JAR_FILE_URIS: list = JARS,
        SUBNETWORK_URI: str = SUBNET,
        SERVICE_ACCOUNT: str = SERVICE_ACCOUNT,
        FILE_URIS: list = [GCS_STAGING_LOCATION + "/" + LOG4J_PROPERTIES]
    ):
        for table in EXECUTION_LIST:
            BATCH_ID = "ora2spanner-{}-{}".format(table, datetime.now().strftime("%s")).replace('_', '-').lower()
            oracle_to_spanner_jobs.append(BATCH_ID)
            if table in PARTITION_OPTIONS.keys():
                partition_options = PARTITION_OPTIONS[table]
                TEMPLATE_SPARK_ARGS = [
                "--template=JDBCTOSPANNER",
                "--templateProperty", "project.id={}".format(PROJECT),
                "--templateProperty", "jdbctospanner.jdbc.url={}".format(JDBC_URL),
                "--templateProperty", "jdbctospanner.jdbc.driver.class.name={}".format(JDBC_DRIVER),
                "--templateProperty", "jdbctospanner.jdbc.fetchsize={}".format(JDBC_FETCH_SIZE),
                "--templateProperty", "jdbctospanner.sql=select * from {}.{}".format(ORACLE_SCHEMA, table),
                "--templateProperty", "jdbctospanner.output.instance={}".format(SPANNER_INSTANCE),
                "--templateProperty", "jdbctospanner.output.database={}".format(SPANNER_DATABASE),
                "--templateProperty", "jdbctospanner.output.table={}".format(table),
                "--templateProperty", "jdbctospanner.output.saveMode={}".format(SPANNER_OUTPUT_MODE),
                "--templateProperty", "jdbctospanner.output.primaryKey={}".format(SPANNER_TABLE_PRIMARY_KEYS[table]),
                "--templateProperty", "jdbctospanner.output.batchInsertSize=200",
                "--templateProperty", "jdbctospanner.sql.partitionColumn={}".format(partition_options[jdbc_input_manager_interface.SPARK_PARTITION_COLUMN]),
                "--templateProperty", "jdbctospanner.sql.lowerBound={}".format(partition_options[jdbc_input_manager_interface.SPARK_LOWER_BOUND]),
                "--templateProperty", "jdbctospanner.sql.upperBound={}".format(partition_options[jdbc_input_manager_interface.SPARK_UPPER_BOUND]),
                "--templateProperty", "jdbctospanner.sql.numPartitions={}".format(partition_options[jdbc_input_manager_interface.SPARK_NUM_PARTITIONS]),
                ]
            else:
                TEMPLATE_SPARK_ARGS = [
                "--template=JDBCTOSPANNER",
                "--templateProperty", "project.id={}".format(PROJECT),
                "--templateProperty", "jdbctospanner.jdbc.url={}".format(JDBC_URL),
                "--templateProperty", "jdbctospanner.jdbc.driver.class.name={}".format(JDBC_DRIVER),
                "--templateProperty", "jdbctospanner.jdbc.fetchsize={}".format(JDBC_FETCH_SIZE),
                "--templateProperty", "jdbctospanner.sql=select * from {}.{}".format(ORACLE_SCHEMA, table),
                "--templateProperty", "jdbctospanner.output.instance={}".format(SPANNER_INSTANCE),
                "--templateProperty", "jdbctospanner.output.database={}".format(SPANNER_DATABASE),
                "--templateProperty", "jdbctospanner.output.table={}".format(table),
                "--templateProperty", "jdbctospanner.output.saveMode={}".format(SPANNER_OUTPUT_MODE),
                "--templateProperty", "jdbctospanner.output.primaryKey={}".format(SPANNER_TABLE_PRIMARY_KEYS[table]),
                "--templateProperty", "jdbctospanner.output.batchInsertSize=200",
                ]
            if JDBC_SESSION_INIT_STATEMENT:
                TEMPLATE_SPARK_ARGS.append("--jdbctospanner.input.sessioninitstatement={}".format(JDBC_SESSION_INIT_STATEMENT))
    
            _ = DataprocSparkBatchOp(
                project=PROJECT_ID,
                location=LOCATION,
                batch_id=BATCH_ID,
                main_class=MAIN_CLASS,
                jar_file_uris=JAR_FILE_URIS,
                file_uris=FILE_URIS,
                subnetwork_uri=SUBNETWORK_URI,
                service_account=SERVICE_ACCOUNT,
                runtime_config_version="1.1", # issue 665
                args=TEMPLATE_SPARK_ARGS
            )
            time.sleep(3)

    compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

    pipeline = aiplatform.PipelineJob(
        display_name="pipeline",
        template_path="pipeline.json",
        pipeline_root=PIPELINE_ROOT,
        enable_caching=False,
    )
    # run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using
    # specific service account instead of default service account 
    # eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')
    pipeline.run()

In [None]:
for execution_list in JOB_LIST:
    print(execution_list)
    migrate_oracle_to_spanner(execution_list)

## Step 10: Get Status for Tables Migrated from Oracle to Cloud Spanner

In [None]:
def get_bearer_token():
    
    try:
        #Defining Scope
        CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]

        #Assining credentials and project value
        credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)

        #Refreshing credentials data
        credentials.refresh(requests.Request())

        #Get refreshed token
        token = credentials.token
        if token:
            return (token,200)
        else:
            return "Bearer token not generated"
    except Exception as error:
        return ("Bearer token not generated. Error : {}".format(error),500)

In [None]:
from google.auth.transport import requests
import google
token = get_bearer_token()
if token[1] == 200:
    print("Bearer token generated")
else:
    print(token)

In [None]:
import requests

oracle_to_spanner_status = []
job_status_url = "https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}"
for job in oracle_to_spanner_jobs:
    auth = "Bearer " + token[0]
    url = job_status_url.format(PROJECT,REGION,job)
    headers = {
      'Content-Type': 'application/json; charset=UTF-8',
      'Authorization': auth 
    }
    response = requests.get(url, headers=headers)
    oracle_to_spanner_status.append(response.json()['state'])

In [None]:
statusDF = pd.DataFrame({"table": ORACLE_TABLE_LIST, "oracle_to_spanner_job": oracle_to_spanner_jobs, "oracle_to_spanner_status": oracle_to_spanner_status})
statusDF

## Step 11: Validate Row Counts of Migrated Tables from Oracle to Cloud Spanner

In [None]:
# Get Oracle table counts
oracle_row_count = input_mgr.get_table_list_with_counts()

In [None]:
# Get Cloud Spanner table counts
from google.cloud import spanner

spanner_client = spanner.Client()
instance = spanner_client.instance(SPANNER_INSTANCE)
database = instance.database(SPANNER_DATABASE)

spanner_row_count = []
for table in ORACLE_TABLE_LIST:
    with database.snapshot() as snapshot:
        results = snapshot.execute_sql("select count(*) from {}".format(table))
        for row in results:
            spanner_row_count.append(row[0])

In [None]:
statusDF['oracle_row_count'] = oracle_row_count 
statusDF['spanner_row_count'] = spanner_row_count 
statusDF