In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# <center>Oracle to BigQuery
<table align="left">
<td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/oracle2bq/OracleToBigQuery_notebook.ipynb">
        <img src="../images/colab-logo-32px.png" alt="Colab logo" />Run in Colab
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Foracle2bq%2FOracleToBigQuery_notebook.ipynb">
        <img src="../images/colab-enterprise-logo-32px.png" alt="GCP Colab Enterprise logo" />Run in Colab Enterprise
    </a>
</td>
<td>
    <a href="https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/oracle2bq/OracleToBigQuery_notebook.ipynb">
        <img src="../images/github-logo-32px.png" alt="GitHub logo" />View on GitHub
    </a>
</td>
<td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/oracle2bq/OracleToBigQuery_notebook.ipynb">
        <img src="../images/vertexai.jpg" alt="Vertex AI logo" />Open in Vertex AI Workbench
    </a>
</td>
</table>

#### References

- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)
- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)
- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)
- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)

#### Overview - Oracle to Bigquery Migration
This notebook helps with the step by step process of migrating oracle database tables to bigquery using Dataproc template.
##### Feedback
Share you feedback, ideas, thoughts [feedback-form](https://forms.gle/XXCJeWeCJJ9fNLQS6). 
Questions, issues, and comments should be directed to dataproc-templates-support-external@googlegroups.com

#### Permissions
This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account.  
Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. If using custom service account, service account attached to Vertex AI notebook should have Service Account User role to use custom role in job.

Make sure that the service account used to run the notebook has the following roles:

- roles/aiplatform.serviceAgent
- roles/aiplatform.customCodeServiceAgent
- roles/storage.objectCreator
- roles/storage.objectViewer
- roles/dataproc.editor
- roles/dataproc.worker
- roles/bigquery.dataEditor

## Step 1: Install Libraries
#### Run Step 1 one time for each new notebook instance

In [None]:
! pip3 install SQLAlchemy
! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q
! pip3 install cx-Oracle

#### Oracle client Installation

In [None]:
%%bash
sudo mkdir -p /opt/oracle
sudo rm -fr /opt/oracle/instantclient*
cd /opt/oracle
sudo wget --no-verbose https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip
sudo unzip instantclient-basic-linuxx64.zip
INSTANT_CLIENT_DIR=$(find /opt/oracle -maxdepth 1 -type d -name "instantclient_[0-9]*_[0-9]*" | sort | tail -1)
test -n "${INSTANT_CLIENT_DIR}" || echo "ERROR: Could not find instant client"
test -n "${INSTANT_CLIENT_DIR}" || exit 1
sudo apt-get install libaio1
sudo sh -c "echo ${INSTANT_CLIENT_DIR} > /etc/ld.so.conf.d/oracle-instantclient.conf"
sudo ldconfig
export LD_LIBRARY_PATH=${INSTANT_CLIENT_DIR}:$LD_LIBRARY_PATH

#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.

Uncomment & run this cell if you have installed anything from above commands

In [None]:
# import os
# import IPython
# if not os.getenv("IS_TESTING"):
#     app = IPython.Application.instance()
#     app.kernel.do_shutdown(True)

## Step 2: Import Libraries

In [None]:
from datetime import datetime
import os
from pathlib import Path
import sys
import time

import google.cloud.aiplatform as aiplatform
from kfp import dsl
from kfp import compiler

try:
    from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp
except ModuleNotFoundError:
    from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp
    
import pandas as pd
import sqlalchemy

module_path = os.path.abspath(os.pardir)
if module_path not in sys.path:
    sys.path.append(module_path)

from util.jdbc.jdbc_input_manager import JDBCInputManager
from util.jdbc import jdbc_input_manager_interface
from util import notebook_functions

## Step 3: Assign Parameters

### Step 3.1 Common Parameters
 
- PROJECT : GCP project-id
- REGION : GCP region
- GCS_STAGING_LOCATION : Cloud Storage staging location to be used for this notebook to store artifacts
- SUBNET : VPC subnet
- JARS : List of jars. For this notebook Oracle JDBC driver and BigQuery connector are required in addition to the dataproc template jars
- MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 5
- SERVICE_ACCOUNT : Custom service account email to use for vertex ai pipeline and dataproc job with above mentioned permissions

In [None]:
IS_PARAMETERIZED = False

In [None]:
if not IS_PARAMETERIZED:
    PROJECT = "<project-id>"
    REGION = "<region>"
    GCS_STAGING_LOCATION = "gs://path"
    SUBNET = "projects/{project}/regions/{region}/subnetworks/{subnet}"
    MAX_PARALLELISM = 5
    SERVICE_ACCOUNT = "" # leave blank to use default service account

OJDBC_JAR = "ojdbc8-21.7.0.0.jar" # For Oracle 11g use ojdbc6-11.2.0.4.jar
# Do not change this parameter unless you want to refer below JARS from new location
JARS = [f"{GCS_STAGING_LOCATION}/jars/{OJDBC_JAR}", f"{GCS_STAGING_LOCATION}/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar"]

In [None]:
# If SERVICE_ACCOUNT is not specified it will take the one attached to Notebook
if SERVICE_ACCOUNT == '':
    shell_output = !gcloud auth list 2>/dev/null
    SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
    print("Service Account: ",SERVICE_ACCOUNT)

### Step 3.2 Oracle to BigQuery Parameters
- ORACLE_HOST: Oracle instance ip address
- ORACLE_PORT: Oracle instance port
- ORACLE_USERNAME: Oracle username
- ORACLE_PASSWORD: Oracle password
- ORACLE_DATABASE: Name of database/service for Oracle connection
- ORACLE_SCHEMA: Schema to be exported, leave blank to export tables owned by ORACLE_USERNAME
- ORACLE_TABLE_LIST: List of tables to migrate e.g.: ['table1', 'table2'] else provide an empty list for migration whole database e.g.: [] 
- ORACLE_READ_PARTITION_COLUMNS: Dictionary of custom read partition columns, e.g.: {'table2': 'secondary_id'}

In [None]:
if not IS_PARAMETERIZED:
    ORACLE_HOST = ""
    ORACLE_PORT = "1521"
    ORACLE_USERNAME = ""
    ORACLE_PASSWORD = ""
    ORACLE_DATABASE = ""
    ORACLE_SCHEMA = ""  # Leave empty to default to ORACLE_USERNAME
    ORACLE_TABLE_LIST = []  # Leave list empty for migrating complete ORACLE_SCHEMA else provide tables as ['table1', 'table2']
    ORACLE_READ_PARTITION_COLUMNS = {} # Leave empty for default read partition columns

### Step 3.3 Notebook Configuration Parameters
Below variables should not be changed unless required
- ORACLE_URL: Oracle Python URL
- JDBC_DRIVER: JDBC driver class
- JDBC_URL: Oracle JDBC URL
- JDBC_FETCH_SIZE: Determines how many rows to fetch per round trip
- JDBC_SESSION_INIT_STATEMENT: Custom SQL statement to execute in each reader database session
- MAIN_CLASS: Dataproc Template Main Class
- WORKING_DIRECTORY: Python working directory
- PACKAGE_EGG_FILE: Dataproc Template distributio file
- PIPELINE_ROOT: Path to Vertex AI pipeline artifacts

In [None]:
cur_path = Path(os.getcwd())
if IS_PARAMETERIZED:
    WORKING_DIRECTORY = os.path.join(cur_path.parent ,'python')
else:
    WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')

# If the above code doesn't fetch the correct path please
# provide complete path to python folder in your dataproc 
# template repo which you cloned.

# WORKING_DIRECTORY = "/home/jupyter/dataproc-templates/python/"
print(WORKING_DIRECTORY)

In [None]:
ORACLE_URL = "oracle://{}:{}@{}:{}?service_name={}".format(ORACLE_USERNAME, ORACLE_PASSWORD, ORACLE_HOST, ORACLE_PORT, ORACLE_DATABASE)
JDBC_DRIVER = "oracle.jdbc.OracleDriver"
JDBC_URL = "jdbc:oracle:thin:{}/{}@{}:{}/{}".format(ORACLE_USERNAME, ORACLE_PASSWORD, ORACLE_HOST, ORACLE_PORT, ORACLE_DATABASE)
JDBC_FETCH_SIZE = 200
JDBC_SESSION_INIT_STATEMENT = "BEGIN DBMS_APPLICATION_INFO.SET_MODULE('Dataproc Templates','OracleToBigQuery Notebook'); END;"
MAIN_CLASS = "com.google.cloud.dataproc.templates.main.DataProcTemplate"
PACKAGE_EGG_FILE = "dataproc_templates_distribution.egg"
PIPELINE_ROOT = GCS_STAGING_LOCATION + "/pipeline_root/dataproc_pyspark"

## Step 4: Generate Oracle Table List
This step creates list of tables for migration. If ORACLE_TABLE_LIST is kept empty all the tables in ORACLE_SCHEMA are listed

In [None]:
input_mgr = JDBCInputManager.create("oracle", sqlalchemy.create_engine(ORACLE_URL))

# Retrieve list of tables from database.
ORACLE_TABLE_LIST = input_mgr.build_table_list(schema_filter=ORACLE_SCHEMA, table_filter=ORACLE_TABLE_LIST)
ORACLE_SCHEMA = input_mgr.get_schema()
print(f"Total tables to migrate from schema {ORACLE_SCHEMA}:", len(ORACLE_TABLE_LIST))

print("List of tables for migration:")
print(ORACLE_TABLE_LIST)

## Step 5: Identify Read Partition Columns
This step uses PARTITION_THRESHOLD (default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be used for partitioned read based on Primary Keys
 - PARTITION_OPTIONS: List will have table and its partitioned column and Spark SQL settings if exceeds the threshold

In [None]:
PARTITION_THRESHOLD = 1000000
PARTITION_OPTIONS = input_mgr.define_read_partitioning(
    PARTITION_THRESHOLD, custom_partition_columns=ORACLE_READ_PARTITION_COLUMNS
)
input_mgr.read_partitioning_df(PARTITION_OPTIONS)

## Step 6: Download JAR files and Upload to Cloud Storage (only rquired to run one-time)
#### Run Step 6 one time for each new notebook instance

In [None]:
%cd $WORKING_DIRECTORY

#### Downloading JDBC Oracle Driver and Bigquery Spark Connector Jar files

In [None]:
OJDBC_PATH = os.path.splitext(OJDBC_JAR)[0].replace("-", "/")
! wget --no-verbose https://repo1.maven.org/maven2/com/oracle/database/jdbc/$OJDBC_PATH/$OJDBC_JAR
! wget --no-verbose https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.27.0/spark-bigquery-with-dependencies_2.12-0.27.0.jar

In [None]:
! python3 ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE

#### Copying JARS files to GCS_STAGING_LOCATION

In [None]:
! gsutil cp main.py $GCS_STAGING_LOCATION/dependencies/
! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/dependencies/
! gsutil cp spark-bigquery-with-dependencies_2.12-0.27.0.jar $GCS_STAGING_LOCATION/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar
! gsutil cp $OJDBC_JAR $GCS_STAGING_LOCATION/jars/$OJDBC_JAR

## Step 7: Calculate Parallel Jobs for Oracle to BigQuery
This step uses MAX_PARALLELISM parameter to calculate number of parallel jobs to run

In [None]:
# Calculate parallel jobs:
JOB_LIST = notebook_functions.split_list(input_mgr.get_table_list(), MAX_PARALLELISM)
print("List of tables for execution:")
print(JOB_LIST)

## Step 8: Execute Pipeline to Migrate Tables from Oracle to BigQuery
- BIGQUERY_DATASET : Target dataset in Bigquery
- BIGQUERY_MODE : Mode of operation at target <append|overwrite|ignore|errorifexists> (default overwrite)
- TEMP_GCS_BUCKET : Bucket name for dataproc job staging
- PYTHON_FILE_URIS : Path to PACKAGE_EGG_FILE
- MAIN_PYTHON_CLASS : Path to main.py

In [None]:
oracle_to_gcs_jobs = []

if not IS_PARAMETERIZED:
    BIGQUERY_DATASET="<bigquery-dataset-name>"
    BIGQUERY_MODE = "overwrite"  # append/overwrite
    TEMP_GCS_BUCKET="<temp-bucket-name>"
PYTHON_FILE_URIS = [ GCS_STAGING_LOCATION + "/dependencies/dataproc_templates_distribution.egg" ]
MAIN_PYTHON_CLASS = GCS_STAGING_LOCATION + "/dependencies/main.py"

In [None]:
def migrate_oracle_to_bigquery(EXECUTION_LIST):
    EXECUTION_LIST = EXECUTION_LIST
    aiplatform.init(project=PROJECT,staging_bucket=TEMP_GCS_BUCKET)
    
    @dsl.pipeline(
        name="python-oracle-to-bigquery-pyspark",
        description="Pipeline to get data from Oracle to BigQuery",
    )
    def pipeline(
        PROJECT_ID: str = PROJECT,
        LOCATION: str = REGION,
        MAIN_PYTHON_CLASS: str = MAIN_PYTHON_CLASS,
        JAR_FILE_URIS: list = JARS,
        SUBNETWORK_URI: str = SUBNET,
        SERVICE_ACCOUNT: str = SERVICE_ACCOUNT,
        PYTHON_FILE_URIS: list = PYTHON_FILE_URIS
    ):
        for table_name in EXECUTION_LIST:
            BATCH_ID = "oracle2bigquery-{}".format(datetime.now().strftime("%s"))
            oracle_to_gcs_jobs.append(BATCH_ID)
            if table_name in PARTITION_OPTIONS.keys():
                partition_options = PARTITION_OPTIONS[table_name]
                TEMPLATE_SPARK_ARGS = [
                    "--template=JDBCTOBIGQUERY",
                    "--jdbc.bigquery.input.url={}".format(JDBC_URL),
                    "--jdbc.bigquery.input.driver={}".format(JDBC_DRIVER),
                    "--jdbc.bigquery.input.table={}.{}".format(ORACLE_SCHEMA, table_name),
                    "--jdbc.bigquery.input.fetchsize={}".format(JDBC_FETCH_SIZE),
                    "--jdbc.bigquery.output.mode={}".format(BIGQUERY_MODE),
                    "--jdbc.bigquery.output.table={}".format(table_name),
                    "--jdbc.bigquery.temp.bucket.name={}".format(TEMP_GCS_BUCKET),
                    "--jdbc.bigquery.output.dataset={}".format(BIGQUERY_DATASET),
                    "--jdbc.bigquery.input.partitioncolumn={}".format(partition_options[jdbc_input_manager_interface.SPARK_PARTITION_COLUMN]),
                    "--jdbc.bigquery.input.lowerbound={}".format(partition_options[jdbc_input_manager_interface.SPARK_LOWER_BOUND]),
                    "--jdbc.bigquery.input.upperbound={}".format(partition_options[jdbc_input_manager_interface.SPARK_UPPER_BOUND]),
                    "--jdbc.bigquery.numpartitions={}".format(partition_options[jdbc_input_manager_interface.SPARK_NUM_PARTITIONS])
                ]

            else:
                TEMPLATE_SPARK_ARGS = [
                    "--template=JDBCTOBIGQUERY",
                    "--jdbc.bigquery.input.url={}".format(JDBC_URL),
                    "--jdbc.bigquery.input.driver={}".format(JDBC_DRIVER),
                    "--jdbc.bigquery.input.table={}.{}".format(ORACLE_SCHEMA, table_name),
                    "--jdbc.bigquery.input.fetchsize={}".format(JDBC_FETCH_SIZE),
                    "--jdbc.bigquery.output.mode={}".format(BIGQUERY_MODE),
                    "--jdbc.bigquery.output.table={}".format(table_name),
                    "--jdbc.bigquery.temp.bucket.name={}".format(TEMP_GCS_BUCKET),
                    "--jdbc.bigquery.output.dataset={}".format(BIGQUERY_DATASET)
                ]
            if JDBC_SESSION_INIT_STATEMENT:
                TEMPLATE_SPARK_ARGS.append("--jdbc.bigquery.input.sessioninitstatement={}".format(JDBC_SESSION_INIT_STATEMENT))

            _ = DataprocPySparkBatchOp(
                project=PROJECT_ID,
                location=LOCATION,
                batch_id=BATCH_ID,
                main_python_file_uri=MAIN_PYTHON_CLASS,
                jar_file_uris=JAR_FILE_URIS,
                python_file_uris=PYTHON_FILE_URIS,
                subnetwork_uri=SUBNETWORK_URI,
                service_account=SERVICE_ACCOUNT,
                runtime_config_version="1.1", # issue 665
                args=TEMPLATE_SPARK_ARGS
                )
            time.sleep(3)

    compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

    pipeline = aiplatform.PipelineJob(
        display_name="pipeline",
        template_path="pipeline.json",
        pipeline_root=PIPELINE_ROOT,
        enable_caching=False,
    )
    pipeline.run(service_account=SERVICE_ACCOUNT)

In [None]:
for execution_list in JOB_LIST:
    print(execution_list)
    migrate_oracle_to_bigquery(execution_list)

## Step 9: Get Status for Tables Migrated from Oracle to BigQuery

In [None]:
def get_bearer_token():
    
    try:
        #Defining Scope
        CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]

        #Assining credentials and project value
        credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)

        #Refreshing credentials data
        credentials.refresh(requests.Request())

        #Get refreshed token
        token = credentials.token
        if token:
            return (token,200)
        else:
            return "Bearer token not generated"
    except Exception as error:
        return ("Bearer token not generated. Error : {}".format(error),500)

In [None]:
from google.auth.transport import requests
import google
token = get_bearer_token()
if token[1] == 200:
    print("Bearer token generated")
else:
    print(token)

In [None]:
import requests

oracle_to_bq_status = []
job_status_url = "https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}"
for job in oracle_to_gcs_jobs:
    auth = "Bearer " + token[0]
    url = job_status_url.format(PROJECT,REGION,job)
    headers = {
      'Content-Type': 'application/json; charset=UTF-8',
      'Authorization': auth 
    }
    response = requests.get(url, headers=headers)
    oracle_to_bq_status.append(response.json()['state'])

In [None]:
statusDF = pd.DataFrame({"table" : ORACLE_TABLE_LIST, "oracle_to_gcs_job" : oracle_to_gcs_jobs, "oracle_to_bq_status" : oracle_to_bq_status})
statusDF

## Step 10: Validate Row Counts of Migrated Tables from Oracle to BigQuery

In [None]:
# Get Oracle table counts
oracle_row_count = input_mgr.get_table_list_with_counts()

In [None]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

bq_row_count = []
for table in ORACLE_TABLE_LIST:
    results = client.query("SELECT row_count FROM {}.__TABLES__ where table_id = '{}'".format(BIGQUERY_DATASET, table))
    for row in results:
        bq_row_count.append(row[0])

In [None]:
statusDF['oracle_row_count'] = oracle_row_count 
statusDF['bq_row_count'] = bq_row_count 
statusDF