migration_toolkit/migrate_table.py (147 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging import sys from common.migration_mode import MigrationMode from common.monitoring_consts import LABEL_KEY, LABEL_VALUE, USER_AGENT from executors.copy_rows import execute_copy_rows from executors.create_table import execute_create_table from executors.discover import execute_discover from executors.fetch_bigquery_table_ddl import execute_fetch_bigquery_table_ddl from executors.get_bigquery_table import execute_get_bigquery_table from executors.update_stream import execute_update_stream from google.api_core.gapic_v1.client_info import ClientInfo from google.cloud import bigquery from google.cloud.bigquery.table import Table from google.cloud.datastream_v1.types import Stream from migration_config import get_config from sql_generators.copy_rows.copy_rows import CopyDataSQLGenerator from sql_generators.create_table.dynamic_datasets_create_table import DynamicDatasetsCreateTable from sql_generators.create_table.single_dataset_create_table import SingleDatasetCreateTable from sql_generators.fetch_bigquery_table_ddl.fetch_bigquery_table_ddl import BigQueryTableDDLFetcher logger = logging.getLogger(__name__) WANTED_USER_PROMPT = "go" def main(): config: argparse.Namespace = get_config() logger.debug(f"Using config {vars(config)}") _add_stream_label( stream=config.stream, datastream_api_endpoint_override=config.datastream_api_endpoint_override, ) # Run Datastream's discover on connection profile and save response to a file execute_discover( connection_profile_name=config.connection_profile_name, source_schema_name=config.source_schema_name, source_table_name=config.source_table_name, source_type=config.source_type, datastream_api_endpoint_override=config.datastream_api_endpoint_override, filepath=config.discover_result_filepath, ) bigquery_client = bigquery.Client( client_info=ClientInfo(user_agent=USER_AGENT) ) if config.single_target_stream: # Generate CREATE TABLE DDL for single dataset stream and save it to a file table_creator = SingleDatasetCreateTable( source_type=config.source_type, discover_result_path=config.discover_result_filepath, create_target_table_ddl_filepath=config.create_target_table_ddl_filepath, source_table_name=config.source_table_name, source_schema_name=config.source_schema_name, bigquery_dataset_name=config.bigquery_target_dataset_name, bigquery_max_staleness_seconds=config.bigquery_max_staleness_seconds, project_id=config.project_id, ) else: # Generate CREATE TABLE DDL for dynamic dataset stream and save it to a file table_creator = DynamicDatasetsCreateTable( source_type=config.source_type, discover_result_path=config.discover_result_filepath, create_target_table_ddl_filepath=config.create_target_table_ddl_filepath, source_table_name=config.source_table_name, source_schema_name=config.source_schema_name, bigquery_max_staleness_seconds=config.bigquery_max_staleness_seconds, project_id=config.project_id, bigquery_region=config.bigquery_region, bigquery_kms_key_name=config.bigquery_kms_key_name, bigquery_dataset_name=config.bigquery_target_dataset_name, ) table_creator.generate_ddl() table_id = table_creator.get_fully_qualified_bigquery_table_name() if ( config.migration_mode == MigrationMode.CREATE_TABLE or config.migration_mode == MigrationMode.FULL ): _verify_bigquery_table_not_exist( table_id=table_id, bigquery_client=bigquery_client ) _wait_for_user_prompt_if_necessary("Creating BigQuery table", config.force) # Run DDL on BigQuery execute_create_table( filepath=config.create_target_table_ddl_filepath, bigquery_client=bigquery_client, ) # Generate SQL statement for fetching source BigQuery table DDL and save it to a file BigQueryTableDDLFetcher( project_id=config.project_id, dataset=config.bigquery_source_dataset_name, table=config.bigquery_source_table_name, filepath=config.fetch_bigquery_source_table_ddl_filepath, ).fetch_table_schema() # Run SQL statement and save the DDL to a file execute_fetch_bigquery_table_ddl( sql_filepath=config.fetch_bigquery_source_table_ddl_filepath, output_path=config.create_source_table_ddl_filepath, bigquery_client=bigquery_client, ) # Generate copy rows SQL statement and save it to a file CopyDataSQLGenerator( source_bigquery_table_ddl=config.create_source_table_ddl_filepath, destination_bigquery_table_ddl=config.create_target_table_ddl_filepath, filepath=config.copy_rows_filepath, ).generate_sql() if config.migration_mode == MigrationMode.FULL: _wait_for_user_prompt_if_necessary( "Copying rows from" f" {config.project_id}.{config.bigquery_source_dataset_name}.{config.bigquery_source_table_name} to" f" {table_id}", config.force, ) # Run SQL statement to copy rows execute_copy_rows( config.copy_rows_filepath, bigquery_client=bigquery_client ) if config.migration_mode == MigrationMode.DRY_RUN: logger.info( "Dry run finished successfully.\nGenerated `CREATE TABLE` DDL at" f" '{config.create_target_table_ddl_filepath}'.\nGenerated copy rows" f" SQL at '{config.copy_rows_filepath}'." ) elif config.migration_mode == MigrationMode.CREATE_TABLE: logger.info( "Table created successfully.\n" f"New table name is `{table_id}`.\n" f"Generated copy rows SQL at '{config.copy_rows_filepath}'." ) else: logger.info( f"Migration finished successfully. New table name is `{table_id}`" ) def _add_stream_label(stream: Stream, datastream_api_endpoint_override: str): stream.labels[LABEL_KEY] = LABEL_VALUE execute_update_stream( stream=stream, datastream_api_endpoint_override=datastream_api_endpoint_override, ) def _verify_bigquery_table_not_exist( table_id: str, bigquery_client: bigquery.Client ): table: Table = execute_get_bigquery_table( table_id, bigquery_client=bigquery_client ) # Table exists, exit to avoid data corruption. if table: logger.error( f"ERROR: Table {table_id} already exists. Drop the table and rerun the" " migration." ) sys.exit(1) def _wait_for_user_prompt_if_necessary(msg: str, force: bool): if force: logger.info(msg + ".") else: prompt = "" while WANTED_USER_PROMPT != prompt: prompt = input(f"{msg}. Type '{WANTED_USER_PROMPT}' to continue.. ") if __name__ == "__main__": main()