in tools/hive-bigquery/hive_to_bigquery/__main__.py [0:0]
def main():
"""Migrates Hive tables to BigQuery.
Establishes connection to Hive, MySQL, GCS and BigQuery. Validates the
user arguments and continues migration from the previous runs, if any.
"""
try:
input_config = init_script.initialize_variables()
except custom_exceptions.ArgumentInitializationError as error:
raise RuntimeError from error
logger.debug("Initializing Properties Reader")
PropertiesReader(input_config)
try:
gcs_component, mysql_component, bq_component, hive_component = \
initialize_components()
except custom_exceptions.ConnectionError as error:
raise RuntimeError from error
try:
mysql_component.check_table_exists(
PropertiesReader.get('tracking_metatable_name'))
except (exceptions.NotFound,
custom_exceptions.MySQLExecutionError) as error:
raise RuntimeError from error
try:
# Validates the user provided resources.
logger.debug("Validating the resources")
if ResourceValidator.validate(hive_component, gcs_component,
bq_component):
logger.debug("All the provided resources are valid")
else:
logger.error("Check the provided resources")
logger.info("Check the log file for detailed errors")
raise RuntimeError
except custom_exceptions.CustomBaseError as error:
raise RuntimeError from error
try:
hive_table_object = HiveTable(hive_component,
PropertiesReader.get('hive_database'),
PropertiesReader.get('hive_table_name'),
PropertiesReader.get('incremental_col'))
except custom_exceptions.HiveExecutionError as error:
raise RuntimeError from error
# Wrapper to describe Hive table resource.
hive_table_model = hive_table_object.hive_table_model
logger.debug(hive_table_model)
bq_table_object = BigQueryTable(PropertiesReader.get('dataset_id'),
PropertiesReader.get('bq_table'),
hive_table_model)
# Wrapper to describe BigQuery table resource.
bq_table_model = bq_table_object.bq_table_model
logger.debug(bq_table_model)
try:
# Verifies whether the tracking table exists from the previous run.
mysql_component.check_tracking_table_exists(hive_table_model)
except custom_exceptions.MySQLExecutionError as error:
raise RuntimeError from error
try:
# Validates the bq_table_write_mode provided by the user.
bq_component.check_bq_write_mode(mysql_component, hive_table_model,
bq_table_model)
except (custom_exceptions.CustomBaseError, exceptions.NotFound,
exceptions.AlreadyExists) as error:
raise RuntimeError from error
# If the value of is_first_run is True, it means that the source Hive
# table is being migrated for the first time.
if hive_table_model.is_first_run:
logger.debug("Migrating for the first time")
try:
# Gets information on data to migrate and creates tracking table
# in Cloud SQL.
tracking_data = hive_component.get_info_on_data_to_migrate(
hive_table_model)
except (custom_exceptions.IncrementalColumnError,
custom_exceptions.HiveExecutionError) as error:
raise RuntimeError from error
try:
mysql_component.create_tracking_table(hive_table_model)
except custom_exceptions.MySQLExecutionError as error:
raise RuntimeError from error
try:
# Migrates data to BigQuery.
hive_component.migrate_data(
mysql_component, bq_component, gcs_component,
hive_table_model, bq_table_model,
PropertiesReader.get('gcs_bucket_name'), tracking_data)
except (custom_exceptions.HiveExecutionError,
custom_exceptions.HDFSCommandError,
custom_exceptions.MySQLExecutionError) as error:
raise RuntimeError from error
try:
# Updates BigQuery job status and wait for all the jobs to finish.
# mysql exec error
bq_component.update_bq_job_status(
mysql_component, gcs_component, hive_table_model,
bq_table_model, PropertiesReader.get('gcs_bucket_name'))
except custom_exceptions.MySQLExecutionError as error:
raise RuntimeError from error
else:
logger.info(
"Tracking table already exists. Continuing from the previous "
"iteration...")
try:
# Copies the pending files from the previous run to GCS, loads them
# to BigQuery and updates the BigQuery load job status.
# mysqlexec
gcs_component.stage_to_gcs(mysql_component, bq_component,
hive_table_model, bq_table_model,
PropertiesReader.get('gcs_bucket_name'))
bq_component.load_gcs_to_bq(mysql_component, hive_table_model,
bq_table_model)
bq_component.update_bq_job_status(
mysql_component, gcs_component, hive_table_model,
bq_table_model, PropertiesReader.get('gcs_bucket_name'))
except custom_exceptions.MySQLExecutionError as error:
raise RuntimeError from error
try:
# Checks for new data in the Hive table.
tracking_data = hive_component.check_inc_data(
mysql_component, bq_component, gcs_component, hive_table_model,
bq_table_model, PropertiesReader.get('gcs_bucket_name'))
except (custom_exceptions.HiveExecutionError,
custom_exceptions.MySQLExecutionError, TypeError) as error:
raise RuntimeError from error
if tracking_data:
# Migrates data to BigQuery and updates job status in the tracking table.
try:
hive_component.migrate_data(
mysql_component, bq_component, gcs_component,
hive_table_model, bq_table_model,
PropertiesReader.get('gcs_bucket_name'), tracking_data)
except (custom_exceptions.HiveExecutionError,
custom_exceptions.MySQLExecutionError) as error:
raise RuntimeError from error
try:
bq_component.update_bq_job_status(
mysql_component, gcs_component, hive_table_model,
bq_table_model, PropertiesReader.get('gcs_bucket_name'))
except custom_exceptions.MySQLExecutionError as error:
raise RuntimeError from error
try:
# Compares the number of rows in BigQuery and Hive tables and
# creates metrics table if there is a match.
compare_row_counts(bq_component, hive_component, gcs_component,
hive_table_model, bq_table_model)
except custom_exceptions.HiveExecutionError as error:
raise RuntimeError from error