python/dataproc_templates/hive/util/hive_ddl_extractor.py (63 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import pprint
from logging import Logger
from typing import Any, Dict, Optional, Sequence
import dataproc_templates.util.template_constants as constants
from dataproc_templates import BaseTemplate
from pyspark.sql import SparkSession
from datetime import datetime
class HiveDDLExtractorTemplate(BaseTemplate):
"""
Dataproc template implementing exports from Hive to BigQuery
"""
@staticmethod
def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument(
f'--{constants.HIVE_DDL_EXTRACTOR_INPUT_DATABASE}',
dest=constants.HIVE_DDL_EXTRACTOR_INPUT_DATABASE,
required=True,
help='Hive database for importing data to BigQuery'
)
parser.add_argument(
f'--{constants.HIVE_DDL_EXTRACTOR_OUTPUT_GCS_PATH}',
dest=constants.HIVE_DDL_EXTRACTOR_OUTPUT_GCS_PATH,
required=True,
help='GCS output path'
)
parser.add_argument(
f'--{constants.HIVE_DDL_CONSIDER_SPARK_TABLES}',
dest=constants.HIVE_DDL_CONSIDER_SPARK_TABLES,
required=False,
default=False,
help='Flag to extract DDL of Spark tables'
)
parser.add_argument(
f'--{constants.HIVE_DDL_TRANSLATION_DISPOSITION}',
dest=constants.HIVE_DDL_TRANSLATION_DISPOSITION,
required=False,
default=False,
help='Remove location parameter from HIVE DDL if set to TRUE, to be compatible with BigQuery SQL translator'
)
known_args: argparse.Namespace
known_args, _ = parser.parse_known_args(args)
return vars(known_args)
def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
"""
Dataproc template allowing the extraction of DDLs from Hive Metastore
"""
logger: Logger = self.get_logger(spark=spark)
hive_database: str = args[constants.HIVE_DDL_EXTRACTOR_INPUT_DATABASE]
gcs_output_path: str = args[constants.HIVE_DDL_EXTRACTOR_OUTPUT_GCS_PATH]
spark_tbls_flag: bool = args[constants.HIVE_DDL_CONSIDER_SPARK_TABLES]
remove_location_flag: bool = args[constants.HIVE_DDL_TRANSLATION_DISPOSITION]
logger.info(
"Starting Hive DDL Extraction job with parameters:\n"
f"{pprint.pformat(args)}"
)
def get_ddl(hive_database, table_name):
spark_tbls_opt = "" if str(spark_tbls_flag).upper() == "TRUE" else "AS SERDE"
ddl_str = spark.sql(f"SHOW CREATE TABLE {hive_database}.{table_name} {spark_tbls_opt}").rdd.map(lambda x: x[0]).collect()[0]
ddl_str = ddl_str + ";" if str(remove_location_flag).upper() != "TRUE" else ddl_str.split("\nLOCATION '")[0].split("\nUSING ")[0]+";"
return ddl_str
ct = datetime.now().strftime("%m-%d-%Y %H.%M.%S")
output_path = gcs_output_path+"/"+hive_database+"/"+str(ct)
tables_names = spark.sql(f"SHOW TABLES IN {hive_database}").select("tableName")
tables_name_list = tables_names.rdd.map(lambda x: x[0]).collect()
tables_ddls = [get_ddl(hive_database, table_name) for table_name in tables_name_list]
ddls_rdd = spark.sparkContext.parallelize(tables_ddls)
ddls_rdd.coalesce(1).saveAsTextFile(output_path)