notebooks/util/sql_translation.py (48 lines of code) (raw):

from google.cloud import bigquery_migration_v2 import logging def create_migration_workflow( gcs_input_path: str, gcs_output_path: str, project_id: str, bq_dataset: str, default_database: str, source_dilect: str, bq_region: str, obj_name_mapping: str = None) -> None: """ This function uses BQ translation API to convert DDLs/SQLs from different sources to BQ Args: gcs_input_path (string): Cloud Storage location where source DDLs are available gcs_output_path (string): Output Cloud Storage location project_id (string): Project ID bq_dataset (string): BQ Dataset ID to be added in the final DDL default_database (string): Project ID to be added in the final DDL source_dilect (string): Can be hive|redshift|netezza|teradata|synapse|mysql|oracle|postgresql|presto|sparksql|SQLserver|vertica bq_region: Region of BQ Dataset obj_name_mapping: The mapping of objects to their desired output names in list form. Returns: name (string): Full name of the Migration Job state (string): Job Run State """ """Creates a migration workflow of a Batch SQL Translation and prints the response.""" LOGGER: logging.Logger = logging.getLogger('dataproc_templates') parent = f"projects/{project_id}/locations/{bq_region}" # Construct a BigQuery Migration client object. client = bigquery_migration_v2.MigrationServiceClient() source_dialect = bigquery_migration_v2.Dialect() # Set the source dialect to Hive SQL. if source_dilect == "hive": source_dialect.hiveql_dialect = bigquery_migration_v2.HiveQLDialect() migration_type="Translation_HiveQL2BQ" # Set the target dialect to BigQuery dialect. target_dialect = bigquery_migration_v2.Dialect() target_dialect.bigquery_dialect = bigquery_migration_v2.BigQueryDialect() # Prepare the config proto. translation_config = bigquery_migration_v2.TranslationConfigDetails( gcs_source_path=gcs_input_path, gcs_target_path=gcs_output_path, source_dialect=source_dialect, target_dialect=target_dialect, source_env={"default_database": default_database, "schema_search_path":{ bq_dataset } }, name_mapping_list=obj_name_mapping ) # Prepare the task. migration_task = bigquery_migration_v2.MigrationTask( type_=migration_type, translation_config_details=translation_config) # Prepare the workflow. workflow = bigquery_migration_v2.MigrationWorkflow( display_name=f"workflow-python-{source_dilect}2bq") workflow.tasks["translation-task"] = migration_task # type: ignore # Prepare the API request to create a migration workflow. request = bigquery_migration_v2.CreateMigrationWorkflowRequest( parent=parent, migration_workflow=workflow,) response = client.create_migration_workflow(request=request) LOGGER.info("Created workflow: "+str(response.display_name)) LOGGER.info("Current state: "+str(response.State(response.state))) return response.name, response.state def get_migration_workflow_status(name): """ This function returns current running state of BQ translation API job Args: name (string): Full name of the Migration Job Returns: state (string): Job Run State """ LOGGER: logging.Logger = logging.getLogger('dataproc_templates') # Create a client client = bigquery_migration_v2.MigrationServiceClient() # Initialize request argument(s) request = bigquery_migration_v2.GetMigrationWorkflowRequest( name=name, ) # Make the request response = client.get_migration_workflow(request=request) LOGGER.info(str(response.state)) # Return the response return response