def main()

in tools/hive-bigquery/hive_to_bigquery/__main__.py [0:0]


def main():
    """Migrates Hive tables to BigQuery.

    Establishes connection to Hive, MySQL, GCS and BigQuery. Validates the
    user arguments and continues migration from the previous runs, if any.
    """

    try:
        input_config = init_script.initialize_variables()
    except custom_exceptions.ArgumentInitializationError as error:
        raise RuntimeError from error

    logger.debug("Initializing Properties Reader")
    PropertiesReader(input_config)

    try:
        gcs_component, mysql_component, bq_component, hive_component = \
            initialize_components()
    except custom_exceptions.ConnectionError as error:
        raise RuntimeError from error

    try:
        mysql_component.check_table_exists(
            PropertiesReader.get('tracking_metatable_name'))
    except (exceptions.NotFound,
            custom_exceptions.MySQLExecutionError) as error:
        raise RuntimeError from error

    try:
        # Validates the user provided resources.
        logger.debug("Validating the resources")
        if ResourceValidator.validate(hive_component, gcs_component,
                                      bq_component):
            logger.debug("All the provided resources are valid")
        else:
            logger.error("Check the provided resources")
            logger.info("Check the log file for detailed errors")
            raise RuntimeError
    except custom_exceptions.CustomBaseError as error:
        raise RuntimeError from error

    try:
        hive_table_object = HiveTable(hive_component,
                                      PropertiesReader.get('hive_database'),
                                      PropertiesReader.get('hive_table_name'),
                                      PropertiesReader.get('incremental_col'))
    except custom_exceptions.HiveExecutionError as error:
        raise RuntimeError from error

    # Wrapper to describe Hive table resource.
    hive_table_model = hive_table_object.hive_table_model
    logger.debug(hive_table_model)

    bq_table_object = BigQueryTable(PropertiesReader.get('dataset_id'),
                                    PropertiesReader.get('bq_table'),
                                    hive_table_model)
    # Wrapper to describe BigQuery table resource.
    bq_table_model = bq_table_object.bq_table_model
    logger.debug(bq_table_model)

    try:
        # Verifies whether the tracking table exists from the previous run.
        mysql_component.check_tracking_table_exists(hive_table_model)
    except custom_exceptions.MySQLExecutionError as error:
        raise RuntimeError from error

    try:
        # Validates the bq_table_write_mode provided by the user.
        bq_component.check_bq_write_mode(mysql_component, hive_table_model,
                                         bq_table_model)
    except (custom_exceptions.CustomBaseError, exceptions.NotFound,
            exceptions.AlreadyExists) as error:
        raise RuntimeError from error

    # If the value of is_first_run is True, it means that the source Hive
    # table is being migrated for the first time.
    if hive_table_model.is_first_run:
        logger.debug("Migrating for the first time")
        try:
            # Gets information on data to migrate and creates tracking table
            # in Cloud SQL.
            tracking_data = hive_component.get_info_on_data_to_migrate(
                hive_table_model)
        except (custom_exceptions.IncrementalColumnError,
                custom_exceptions.HiveExecutionError) as error:
            raise RuntimeError from error

        try:
            mysql_component.create_tracking_table(hive_table_model)
        except custom_exceptions.MySQLExecutionError as error:
            raise RuntimeError from error

        try:
            # Migrates data to BigQuery.
            hive_component.migrate_data(
                mysql_component, bq_component, gcs_component,
                hive_table_model, bq_table_model,
                PropertiesReader.get('gcs_bucket_name'), tracking_data)
        except (custom_exceptions.HiveExecutionError,
                custom_exceptions.HDFSCommandError,
                custom_exceptions.MySQLExecutionError) as error:
            raise RuntimeError from error
        try:
            # Updates BigQuery job status and wait for all the jobs to finish.
            # mysql exec error
            bq_component.update_bq_job_status(
                mysql_component, gcs_component, hive_table_model,
                bq_table_model, PropertiesReader.get('gcs_bucket_name'))
        except custom_exceptions.MySQLExecutionError as error:
            raise RuntimeError from error

    else:
        logger.info(
            "Tracking table already exists. Continuing from the previous "
            "iteration...")
        try:
            # Copies the pending files from the previous run to GCS, loads them
            # to BigQuery and updates the BigQuery load job status.
            # mysqlexec
            gcs_component.stage_to_gcs(mysql_component, bq_component,
                                       hive_table_model, bq_table_model,
                                       PropertiesReader.get('gcs_bucket_name'))
            bq_component.load_gcs_to_bq(mysql_component, hive_table_model,
                                        bq_table_model)
            bq_component.update_bq_job_status(
                mysql_component, gcs_component, hive_table_model,
                bq_table_model, PropertiesReader.get('gcs_bucket_name'))
        except custom_exceptions.MySQLExecutionError as error:
            raise RuntimeError from error

    try:
        # Checks for new data in the Hive table.

        tracking_data = hive_component.check_inc_data(
            mysql_component, bq_component, gcs_component, hive_table_model,
            bq_table_model, PropertiesReader.get('gcs_bucket_name'))
    except (custom_exceptions.HiveExecutionError,
            custom_exceptions.MySQLExecutionError, TypeError) as error:
        raise RuntimeError from error

    if tracking_data:
        # Migrates data to BigQuery and updates job status in the tracking table.
        try:
            hive_component.migrate_data(
                mysql_component, bq_component, gcs_component,
                hive_table_model, bq_table_model,
                PropertiesReader.get('gcs_bucket_name'), tracking_data)
        except (custom_exceptions.HiveExecutionError,
                custom_exceptions.MySQLExecutionError) as error:
            raise RuntimeError from error
        try:
            bq_component.update_bq_job_status(
                mysql_component, gcs_component, hive_table_model,
                bq_table_model, PropertiesReader.get('gcs_bucket_name'))
        except custom_exceptions.MySQLExecutionError as error:
            raise RuntimeError from error
    try:
        # Compares the number of rows in BigQuery and Hive tables and
        # creates metrics table if there is a match.
        compare_row_counts(bq_component, hive_component, gcs_component,
                           hive_table_model, bq_table_model)
    except custom_exceptions.HiveExecutionError as error:
        raise RuntimeError from error