notebooks/hive2bq/HiveToBigquery_notebook.ipynb (932 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "a3f55fbb-2197-4038-88c5-6c896a9f071a",
"metadata": {},
"outputs": [],
"source": [
"# Copyright 2022 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"id": "3fef31ac",
"metadata": {},
"source": [
"# <center>Hive to BigQuery Migration\n",
"<table align=\"left\">\n",
"<td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/hive2bq/HiveToBigquery_notebook.ipynb\">\n",
" <img src=\"../images/colab-logo-32px.png\" alt=\"Colab logo\" />Run in Colab\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fdataproc-templates%2Fmain%2Fnotebooks%2Fhive2bq%2FHiveToBigquery_notebook.ipynb\">\n",
" <img src=\"../images/colab-enterprise-logo-32px.png\" alt=\"GCP Colab Enterprise logo\" />Run in Colab Enterprise\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/notebooks/hive2bq/HiveToBigquery_notebook.ipynb\">\n",
" <img src=\"../images/github-logo-32px.png\" alt=\"GitHub logo\" />View on GitHub\n",
" </a>\n",
"</td>\n",
"<td>\n",
" <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/dataproc-templates/main/notebooks/hive2bq/HiveToBigquery_notebook.ipynb\">\n",
" <img src=\"../images/vertexai.jpg\" alt=\"Vertex AI logo\" />Open in Vertex AI Workbench\n",
" </a>\n",
"</td>\n",
"</table>"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "fb9eff8d-6b63-4e8f-b369-68cbb4ef04ee",
"metadata": {},
"source": [
"#### References\n",
"\n",
"- [DataprocPySparkBatchOp reference](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.experimental.dataproc.html)\n",
"- [Kubeflow SDK Overview](https://www.kubeflow.org/docs/components/pipelines/sdk/sdk-overview/)\n",
"- [Dataproc Serverless in Vertex AI Pipelines tutorial](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_dataproc_serverless_pipeline_components.ipynb)\n",
"- [Build a Vertex AI Pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)\n",
"\n",
"This notebook is built to run a Vertex AI User-Managed Notebook using the default Compute Engine Service Account. \n",
"Check the Dataproc Serverless in Vertex AI Pipelines tutorial linked above to learn how to setup a different Service Account. "
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "0852a764-dad3-4f3b-b0b5-a71825469fd3",
"metadata": {},
"source": [
"#### Permissions\n",
"\n",
"Make sure that the service account used to run the notebook has the following roles:\n",
"\n",
"- roles/aiplatform.serviceAgent\n",
"- roles/aiplatform.customCodeServiceAgent\n",
"- roles/storage.objectCreator\n",
"- roles/storage.objectViewer\n",
"- roles/dataproc.editor\n",
"- roles/dataproc.worker"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c640fa29-1a04-4301-974d-9fcec95b7e7c",
"metadata": {
"tags": []
},
"source": [
"#### Step 1:\n",
"#### Install the required packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b3742b9a-143d-49fc-b43c-e56179c7f0f2",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Google Cloud notebooks requires dependencies to be installed with '--user'\n",
"! pip3 install pyspark\n",
"! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q\n",
"! pip3 install pip install google-auth==2.13.0\n",
"! pip3 install --upgrade google-cloud-bigquery-migration\n",
"\n",
"# Install latest JDK\n",
"! sudo apt-get update\n",
"! sudo apt-get install default-jdk -y"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "58552a67-9012-4ba9-82e9-d34299cd6d15",
"metadata": {},
"source": [
"#### Once you've installed the additional packages, you may need to restart the notebook kernel so it can find the packages.\n",
"\n",
"Uncomment & Run this cell if you have installed anything from above commands"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "01c19b5e-e7d9-416f-ad12-edc19b6877e6",
"metadata": {},
"outputs": [],
"source": [
"# import os\n",
"\n",
"# if not os.getenv(\"IS_TESTING\"):\n",
"# import IPython\n",
"# app = IPython.Application.instance()\n",
"# app.kernel.do_shutdown(True)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7a2fa95e-b745-4649-8040-af99e7a4013c",
"metadata": {},
"source": [
"#### Step 2:\n",
"#### Set Google Cloud properties"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "920dd937-7a43-4709-8297-f9c346d7897c",
"metadata": {},
"source": [
"**Overview** \n",
"This notebook shows how to build a Vertex AI Pipeline to run a Dataproc Template \n",
"using the DataprocPySparkBatchOp component."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2deada21",
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"IS_PARAMETERIZED = False"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7e2ef85e-4cac-464c-9ed4-a66ea9c4f4c6",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# User Configuration\n",
"# User Inputs\n",
"if not IS_PARAMETERIZED:\n",
" get_project_id = ! gcloud config list --format 'value(core.project)' 2>/dev/null\n",
" PROJECT = get_project_id[0]\n",
" REGION = \"\" # example \"us-west1\"\n",
" GCS_STAGING_LOCATION = \"gs://<bucket_name>\" # example \"gs://my_bucket_name\"\n",
" SUBNET = \"\" # example \"projects/<project-id>/regions/<region-id>/subnetworks/<subnet-name>\" \n",
" INPUT_HIVE_DATABASE= \"\"\n",
" INPUT_HIVE_TABLES= \"*\" # example \"table1,table2,table3...\" or \"*\"\n",
" OUTPUT_BIGQUERY_DATASET= \"\"\n",
" TEMP_BUCKET= \"<bucket_name>\"\n",
" HIVE_METASTORE= \"\" # example \"thrift://<my_hive_server>:9083\"\n",
" BQ_DATASET_REGION=\"us\"\n",
" DATAPROC_SERVICE_ACCOUNT = \"\" # eg: test@project_id.iam.gserviceaccount.com\n",
"\n",
" ## Change if needed\n",
" HIVE_OUTPUT_MODE=\"overwrite\"\n",
" MAX_PARALLELISM=10 # Controls number of parallel Dataproc Serverless Jobs"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "454bb0de-3e0a-4daa-92ed-1319e1d9604d",
"metadata": {},
"source": [
"#### Step 3:\n",
"#### Import dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c7aede36-3a0a-43f7-8e4c-db2d8087e289",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import google.cloud.aiplatform as aiplatform\n",
"from kfp import dsl\n",
"from kfp import compiler\n",
"from datetime import datetime\n",
"\n",
"try:\n",
" from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp\n",
"except ModuleNotFoundError:\n",
" from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp\n",
"\n",
"import time\n",
"import os\n",
"import sys\n",
"from pyspark.sql import SparkSession\n",
"import pandas as pd\n",
"from pathlib import Path\n",
"import subprocess\n",
"from google.cloud import bigquery\n",
"\n",
"module_path = os.path.abspath(os.pardir)\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)\n",
"\n",
"from util.sql_translation import create_migration_workflow, get_migration_workflow_status"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d8b33331-b529-47ac-a08d-0c166acd264e",
"metadata": {},
"source": [
"#### Step 4:\n",
"#### Change working directory to the Dataproc Templates python folder"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "da510653",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"cur_path = Path(os.getcwd())\n",
"if IS_PARAMETERIZED:\n",
" WORKING_DIRECTORY = os.path.join(cur_path.parent ,'python')\n",
"else:\n",
" WORKING_DIRECTORY = os.path.join(cur_path.parent.parent ,'python')\n",
"\n",
"# If the above code doesn't fetches the correct path please\n",
"# provide complete path to python folder in your dataproc \n",
"# template repo which you cloned \n",
"\n",
"# WORKING_DIRECTORY = \"/home/jupyter/dataproc-templates/python/\"\n",
"print(WORKING_DIRECTORY)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d6e0e80a-e8b7-4e54-9f99-b7874e164978",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%cd $WORKING_DIRECTORY"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "e73bb252-c3e3-4d56-b1ad-f52a2db52869",
"metadata": {},
"source": [
"#### Step 5:\n",
"#### Build Dataproc Templates python package"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cfaa8c93-5e69-48fc-984a-f4fa3b28519b",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"PACKAGE_EGG_FILE = \"dist/dataproc_templates_distribution.egg\"\n",
"! python ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "b528c874-b948-40c1-b57c-afff76d80b47",
"metadata": {},
"source": [
"#### Step 6:\n",
"#### Copy package to the Cloud Storage bucket\n",
"\n",
"For this, make sure that the service account used to run the notebook has the following roles:\n",
" - roles/storage.objectCreator\n",
" - roles/storage.objectViewer"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "33dd85ce-46f6-4b35-8997-2189cf1b2eee",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!gsutil cp main.py $GCS_STAGING_LOCATION/\n",
"!gsutil cp $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/dist/"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "5a8d8c5f-b28b-4fbb-b585-cb2f1d9c1915",
"metadata": {},
"source": [
"#### Step 7:\n",
"#### Get Hive Tables \n",
"In case user wants to load all the Hive tables from the database, we need to get the table list using the metastore.\n",
"\n",
"Below cell will fetch all tables from the Hive database by running a Spark SQL query using the provided Hive Metastore."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3751abe3-47b7-4b4a-81f4-498da750d884",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"if INPUT_HIVE_TABLES==\"*\":\n",
" #os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-11-openjdk-amd64\"\n",
" #os.environ[\"PATH\"] = os.environ[\"JAVA_HOME\"] + \"/bin:\" + os.environ[\"PATH\"]\n",
" spark=SparkSession.builder \\\n",
" .master(\"local\")\\\n",
" .appName(\"Spark Job to get HIVE table list\") \\\n",
" .config(\"hive.metastore.uris\",HIVE_METASTORE) \\\n",
" .enableHiveSupport() \\\n",
" .getOrCreate() \n",
" TABLE_LIST_DF=spark.sql(\"show tables in \"+INPUT_HIVE_DATABASE)\n",
" TABLE_LIST=TABLE_LIST_DF.select(\"tableName\").rdd.flatMap(lambda x: x).collect()\n",
" print(\"Table Sets to Migrate: \")\n",
" print(TABLE_LIST)\n",
" spark.stop()\n",
"else:\n",
" TABLE_LIST=INPUT_HIVE_TABLES.split(\",\")\n",
" print(\"Table Sets to Migrate: \")\n",
" print(TABLE_LIST)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "a0093181-7c2f-49ab-a071-a93c3a74c974",
"metadata": {},
"source": [
"#### ----- Skip steps 8-16 to create all HIVE tables non partitioned tables in Bigquery -----\n",
"\n",
"\n",
"#### Step 8:\n",
"#### Get Required Variables for HIVEDDLEXTRACTOR"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d5fbc306-ca17-4bc7-a855-7afcc9ee2324",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"DDL_INPUT_PATH=GCS_STAGING_LOCATION+\"/hiveddl/input\"\n",
"DDL_OUTPUT_PATH=GCS_STAGING_LOCATION+\"/hiveddl/output\"\n",
"os.environ[\"GCP_PROJECT\"]=PROJECT\n",
"os.environ[\"REGION\"]=REGION\n",
"os.environ[\"GCS_STAGING_LOCATION\"]=GCS_STAGING_LOCATION\n",
"os.environ[\"SUBNET\"]=SUBNET\n",
"os.environ[\"HIVE_METASTORE\"]=HIVE_METASTORE\n",
"os.environ[\"INPUT_HIVE_DATABASE\"]=INPUT_HIVE_DATABASE\n",
"os.environ[\"GCS_STAGING_PATH\"]=DDL_INPUT_PATH"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1bc26c67-4d75-47dc-9a96-bed55035c25d",
"metadata": {},
"source": [
"#### Step 9:\n",
"#### Run HIVEDDLEXTRACTOR to extract HIVE DDLs\n",
"\n",
"We will be making use of HIVEDDLEXTRACTOR utility to connect to thrift server and extracting all the DDLs. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "36339d20-3039-4cd5-bba9-01a3d2bb6b4c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!./bin/start.sh \\\n",
" --properties=spark.hadoop.hive.metastore.uris=$HIVE_METASTORE \\\n",
" -- --template=HIVEDDLEXTRACTOR \\\n",
" --hive.ddl.extractor.input.database=$INPUT_HIVE_DATABASE \\\n",
" --hive.ddl.extractor.output.path=$GCS_STAGING_PATH\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "805e7b49-4cda-4378-a5b7-ca89cf948c4f",
"metadata": {},
"source": [
"#### Step 10:\n",
"#### Get the latest DDL Cloud Storage path\n",
"\n",
"HIVEDDLEXTRACTOR utility will create seperate directory for each run based on the hive database name and run time."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "76ed4d99-992a-4d2e-8d53-67055fcc60b9",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"result=subprocess.run(f\"gsutil ls {DDL_INPUT_PATH}/{INPUT_HIVE_DATABASE} | sed '$!d'\", capture_output=True, shell=True, encoding=\"utf-8\")\n",
"latest_ddl_path=str(result.stdout)[:-1]\n",
"print(latest_ddl_path)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "c5b99e70-63dd-4a05-b55f-8c43eeb8862d",
"metadata": {},
"source": [
"#### Step 12:\n",
"#### Copy Global Typeconvert file into the input folder\n",
"\n",
"Currently Translation API converts all the TIMESTAMP datatypes to DATETIME, Spark faces difficult copying TIMESTAMP Hive column to DATETIME Bigquery column. Global Typeconvert configuration will convert back all the DATETIME column to TIMESTAMP"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "396d7af7-fb5c-4cc0-84dd-064d54cfacc6",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!gsutil cp ../notebooks/util/global_typeconvert.config.yaml \"$latest_ddl_path\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d6ca5bb6-e812-4821-997c-f25d3f0043fc",
"metadata": {},
"source": [
"#### Step 13:\n",
"#### Create object name mapping\n",
"\n",
"Object name mapping will help us to replace HIVE database name with Bigquery Dataset in the final DDL"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "223f41c2-34ea-4be0-b4d4-49b13be80ce0",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"obj_name_mapping={\n",
" \"name_map\": [{\n",
" \"source\": {\n",
" \"schema\": INPUT_HIVE_DATABASE,\n",
" },\n",
" \"target\": {\n",
" \"schema\": OUTPUT_BIGQUERY_DATASET,\n",
" }\n",
" }, \n",
" ]\n",
" }"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "eab93500-2446-4874-a5ba-925584075c0e",
"metadata": {},
"source": [
"#### Step 14:\n",
"#### Call create_migration_workflow to convert HIVE DDls to Bigquery syntax\n",
"\n",
"Below cell will call create_migration_workflow function to call BQ Translation API and then wait for the status to be completed.\n",
"\n",
"It will create the translated DDLs in \"gs://bucket_name/hiveddl/output/bq_dataset\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "47768a89-0bed-42d2-91b1-3f46ae258434",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"#Set required variables to be passed to the create_migration_workflow\n",
"gcs_input_path=latest_ddl_path\n",
"gcs_output_path=DDL_OUTPUT_PATH+\"/\"+OUTPUT_BIGQUERY_DATASET\n",
"project_id=PROJECT\n",
"bq_dataset=OUTPUT_BIGQUERY_DATASET\n",
"default_database=PROJECT\n",
"source_dilect=\"hive\"\n",
"bq_region=BQ_DATASET_REGION\n",
"\n",
"# Call create_migration_workflow with the required parameters\n",
"workflow_name,workflow_state=create_migration_workflow(\n",
" gcs_input_path, gcs_output_path, project_id, bq_dataset,\n",
" default_database, source_dilect, bq_region,obj_name_mapping)\n",
"\n",
"#Get Workflow status\n",
"while (str(workflow_state) == \"State.RUNNING\"):\n",
" print(\"Running Migration Workflow\")\n",
" time.sleep(5)\n",
" workflow_state=get_migration_workflow_status(workflow_name).state\n",
"print(str(workflow_state))"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9533ff6d-34a0-4e8f-b268-bdc475f90e69",
"metadata": {},
"source": [
"#### Step 14:\n",
"#### Extract translated DDLs\n",
"\n",
"Below cell will read all the translated Bigquery DDLs"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8e65cefb-2e92-4fd6-8666-7a57644da25b",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from google.cloud import storage\n",
"result=subprocess.run(f\"gsutil ls {gcs_output_path} \", capture_output=True, shell=True, encoding=\"utf-8\")\n",
"translated_files=result.stdout.split(\"\\n\")\n",
"all_ddls=\"\"\n",
"client = storage.Client()\n",
"for file in translated_files:\n",
" if \"/_SUCCESS\" not in file and \"batch_translation_report.csv\" not in file and \"consumed_name_map.json\" not in file and len(file)>0:\n",
" bucket_name=file.replace(\"gs://\",\"\").split(\"/\",1)[0]\n",
" file_path=file.replace(\"gs://\",\"\").split(\"/\",1)[1]\n",
" bucket = client.get_bucket(bucket_name)\n",
" blob = bucket.blob(file_path)\n",
" all_ddls=all_ddls+(blob.download_as_string().decode(\"utf-8\"))"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "3e4cd771-8cc3-424d-b502-28849a702724",
"metadata": {
"collapsed": true,
"jupyter": {
"outputs_hidden": true
},
"tags": []
},
"source": [
"#### Step 15:\n",
"#### Run translated ddls and create BQ partitioned and clustered tables\n",
"\n",
"Below cell will try to run each bigquery DDL one by one and save the status in DDL_FAIL_AUDIT_DF dataframe"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "859c3439-2dc5-4399-b5d1-2e28983ece7f",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"DDL_FAIL_AUDIT_DICT={}\n",
"DDL_FAIL_AUDIT_DF = pd.DataFrame(columns=[\"Source_DB_Name\",\"Source_Table_Set\",\"Target_DB_Name\",\"Target_Table_Set\",\"Job_Start_Time\",\"Job_End_Time\",\"Job_Status\"])\n",
"client = bigquery.Client()\n",
"for ddl in all_ddls[:-1].split(\";\\n\"):\n",
" tblnm=ddl.split(\"CREATE TABLE\")[1].split(\"\\n\")[0].split(\".\")[2]\n",
" try:\n",
" job = client.query(ddl)\n",
" job.result()\n",
" print(f\"Table Created in bigquery: {OUTPUT_BIGQUERY_DATASET}.{tblnm}\")\n",
" except Exception as e:\n",
" print(f\"Failed to create table: {OUTPUT_BIGQUERY_DATASET}.{tblnm}\")\n",
" print(e)\n",
" DDL_FAIL_AUDIT_DICT[\"Source_DB_Name\"]=INPUT_HIVE_DATABASE\n",
" DDL_FAIL_AUDIT_DICT[\"Source_Table_Set\"]=tblnm\n",
" DDL_FAIL_AUDIT_DICT[\"Target_DB_Name\"]=OUTPUT_BIGQUERY_DATASET\n",
" DDL_FAIL_AUDIT_DICT[\"Target_Table_Set\"]=tblnm\n",
" DDL_FAIL_AUDIT_DICT[\"Job_Start_Time\"]=str(datetime.now())\n",
" DDL_FAIL_AUDIT_DICT[\"Job_Status\"]=f\"FAIL REASON: {e}\"\n",
" DDL_FAIL_AUDIT_DICT[\"Job_End_Time\"]=str(datetime.now())\n",
" DDL_FAIL_AUDIT_DF=DDL_FAIL_AUDIT_DF.append(DDL_FAIL_AUDIT_DICT, ignore_index = True)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d90ab6ca-c378-4367-8fca-e5b62a28876d",
"metadata": {},
"source": [
"#### Step 16:\n",
"#### Remove failed tables from the final TABLE_LIST\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8affd005-ae5e-4950-8ec4-673c87219aa4",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import copy\n",
"FAILED_DDL_TBLS=DDL_FAIL_AUDIT_DF[['Target_Table_Set']].values.ravel().tolist()\n",
"TABLE_LIST_COPY=copy.deepcopy(TABLE_LIST)\n",
"for element in TABLE_LIST_COPY:\n",
" if element in FAILED_DDL_TBLS:\n",
" TABLE_LIST.remove(element)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "0dbaf266-91a0-46e2-8a88-b594b9301c83",
"metadata": {},
"source": [
"\n",
"#### Step 17:\n",
"\n",
"Split Hive Tables list based on MAX_PARALLELISM value provided by the user."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6b170e58-7acf-4aae-9619-2361eef0ed12",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"COMPLETE_LIST = copy.deepcopy(TABLE_LIST)\n",
"PARALLEL_JOBS = len(TABLE_LIST)//MAX_PARALLELISM\n",
"JOB_LIST = []\n",
"while len(COMPLETE_LIST) > 0:\n",
" SUB_LIST = []\n",
" for i in range(MAX_PARALLELISM):\n",
" if len(COMPLETE_LIST)>0 :\n",
" SUB_LIST.append(COMPLETE_LIST[0])\n",
" COMPLETE_LIST.pop(0)\n",
" else:\n",
" break\n",
" JOB_LIST.append(SUB_LIST)\n",
"print(\"List of tables for execution : \")\n",
"print(JOB_LIST)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "e7d5a0b9-d2b2-4072-aa4f-3748609f7cdc",
"metadata": {},
"source": [
"#### Step 18:\n",
"\n",
"Set Dataproc Template Properties"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "73197165-0ea0-4732-841d-8ba4bcd8a94d",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"PIPELINE_ROOT = GCS_STAGING_LOCATION + \"/pipeline_root/dataproc_pyspark\"\n",
"MAIN_PYTHON_FILE = GCS_STAGING_LOCATION + \"/main.py\"\n",
"JARS = [\"gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar\"]\n",
"PYTHON_FILE_URIS = [GCS_STAGING_LOCATION + \"/dist/dataproc_templates_distribution.egg\"]"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "332168ba-6e69-4f6e-aa03-38b4965b5304",
"metadata": {
"tags": []
},
"source": [
"#### Step 19:\n",
"#### Build pipeline and run Dataproc Template on Vertex AI Pipelines to migrate Hive tables to BigQuery\n",
"\n",
"For this, make sure that the service account used to run the notebook has the following roles:\n",
" - roles/dataproc.editor\n",
" - roles/dataproc.worker"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "095c13bf-c121-4465-88fc-04778ef35a36",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"runtime_prop={}\n",
"runtime_prop['spark.hadoop.hive.metastore.uris']=HIVE_METASTORE\n",
"runtime_prop['mapreduce.fileoutputcommitter.marksuccessfuljobs'] = \"false\"\n",
"\n",
"def migrate_hive(EXECUTION_LIST):\n",
" EXECUTION_LIST = EXECUTION_LIST\n",
" aiplatform.init(project=PROJECT, staging_bucket=GCS_STAGING_LOCATION)\n",
"\n",
" @dsl.pipeline(\n",
" name=\"hive-to-bq-pyspark\",\n",
" description=\"Pipeline to migrate tables from hive to bq\",\n",
" )\n",
" def pipeline(\n",
" project_id: str = PROJECT,\n",
" location: str = REGION,\n",
" main_python_file_uri: str = MAIN_PYTHON_FILE,\n",
" python_file_uris: list = PYTHON_FILE_URIS,\n",
" jar_file_uris: list = JARS,\n",
" subnetwork_uri: str = SUBNET,\n",
" service_account: str = DATAPROC_SERVICE_ACCOUNT\n",
" ):\n",
" for table in EXECUTION_LIST:\n",
" BATCH_ID = \"hive2bq-{}-{}\".format(table,datetime.now().strftime(\"%s\")).replace('_','-')\n",
" TEMPLATE_SPARK_ARGS = [\n",
" \"--template=HIVETOBIGQUERY\",\n",
" \"--hive.bigquery.input.database={}\".format(INPUT_HIVE_DATABASE),\n",
" \"--hive.bigquery.input.table={}\".format(table),\n",
" \"--hive.bigquery.output.table={}\".format(table),\n",
" \"--hive.bigquery.output.dataset={}\".format(OUTPUT_BIGQUERY_DATASET),\n",
" \"--hive.bigquery.output.mode={}\".format(HIVE_OUTPUT_MODE),\n",
" \"--hive.bigquery.temp.bucket.name={}\".format(TEMP_BUCKET) ]\n",
" _ = DataprocPySparkBatchOp(\n",
" project=project_id,\n",
" location=location,\n",
" batch_id=BATCH_ID,\n",
" main_python_file_uri=main_python_file_uri,\n",
" python_file_uris=python_file_uris,\n",
" jar_file_uris=jar_file_uris,\n",
" subnetwork_uri=subnetwork_uri,\n",
" service_account=service_account,\n",
" runtime_config_properties=runtime_prop,\n",
" runtime_config_version=\"1.1\", # issue 665\n",
" args=TEMPLATE_SPARK_ARGS,\n",
" )\n",
" time.sleep(5)\n",
"\n",
" compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n",
"\n",
" pipeline = aiplatform.PipelineJob(\n",
" display_name=\"pipeline\",\n",
" template_path=\"pipeline.json\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
" enable_caching=False,\n",
" location=REGION,\n",
" )\n",
" # run() method has an optional parameter `service_account` which you can pass if you want to run pipeline using\n",
" # specific service account instead of default service account \n",
" # eg. pipeline.run(service_account='test@project_id.iam.gserviceaccount.com')\n",
" pipeline.run()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "a7ba4453-ddf9-4f35-901e-ab20433ae7fa",
"metadata": {},
"source": [
"#### Step 20:\n",
"\n",
"Run Dataproc Batch Template based on Hive Tables list calculated in Step 8.\n",
"\n",
"The below cell will call function migrate_hive to migrate tables using dataproc serverless batch job and also add an entry in Audit Table for each Table Set."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4580bcc2-9302-48dc-8a47-7ccac603fbd7",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"AUDIT_DICT={}\n",
"AUDIT_DF = pd.DataFrame(columns=[\"Source_DB_Name\",\"Source_Table_Set\",\"Target_DB_Name\",\"Target_Table_Set\",\"Job_Start_Time\",\"Job_End_Time\",\"Job_Status\"])\n",
" \n",
"for execution_list in JOB_LIST:\n",
" print(\"\\n\\nLoading Table Set: \"+str(execution_list))\n",
" AUDIT_DICT[\"Source_DB_Name\"]=INPUT_HIVE_DATABASE\n",
" AUDIT_DICT[\"Source_Table_Set\"]='|'.join(execution_list)\n",
" AUDIT_DICT[\"Target_DB_Name\"]=OUTPUT_BIGQUERY_DATASET\n",
" AUDIT_DICT[\"Target_Table_Set\"]='|'.join(execution_list)\n",
" AUDIT_DICT[\"Job_Start_Time\"]=str(datetime.now())\n",
" try:\n",
" migrate_hive(execution_list)\n",
" \n",
" except Exception as e:\n",
" AUDIT_DICT[\"Job_Status\"]=\"FAIL\"\n",
" raise Exception(\"\\n\\nSome Error Occured while loading Table Set: \"+str(execution_list)) from e\n",
" else:\n",
" AUDIT_DICT[\"Job_Status\"]=\"PASS\"\n",
" print(\"\\n\\nLoaded Table Set: \"+str(execution_list))\n",
"\n",
" AUDIT_DICT[\"Job_End_Time\"]=str(datetime.now())\n",
" AUDIT_DF=AUDIT_DF.append(AUDIT_DICT, ignore_index = True)\n",
"\n",
"AUDIT_DF_COMBINED = pd.concat([AUDIT_DF, DDL_FAIL_AUDIT_DF], axis=0)\n",
"\n",
"if AUDIT_DF_COMBINED.empty:\n",
" print(\"Audit Dataframe is Empty\")\n",
"else:\n",
" print(AUDIT_DF_COMBINED)\n",
" AUDIT_DF_COMBINED.to_csv(\"gs://\"+TEMP_BUCKET+\"/audit/audit_file_{}.csv\".format(str(datetime.now())),index=False,header = False)"
]
}
],
"metadata": {
"environment": {
"kernel": "python3",
"name": "common-cpu.m107",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m107"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel) (Local)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.9"
},
"vscode": {
"interpreter": {
"hash": "b0fa6594d8f4cbf19f97940f81e996739fb7646882a419484c72d19e05852a7e"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}