in migration_toolkit/migrate_table.py [0:0]
def main():
config: argparse.Namespace = get_config()
logger.debug(f"Using config {vars(config)}")
_add_stream_label(
stream=config.stream,
datastream_api_endpoint_override=config.datastream_api_endpoint_override,
)
# Run Datastream's discover on connection profile and save response to a file
execute_discover(
connection_profile_name=config.connection_profile_name,
source_schema_name=config.source_schema_name,
source_table_name=config.source_table_name,
source_type=config.source_type,
datastream_api_endpoint_override=config.datastream_api_endpoint_override,
filepath=config.discover_result_filepath,
)
bigquery_client = bigquery.Client(
client_info=ClientInfo(user_agent=USER_AGENT)
)
if config.single_target_stream:
# Generate CREATE TABLE DDL for single dataset stream and save it to a file
table_creator = SingleDatasetCreateTable(
source_type=config.source_type,
discover_result_path=config.discover_result_filepath,
create_target_table_ddl_filepath=config.create_target_table_ddl_filepath,
source_table_name=config.source_table_name,
source_schema_name=config.source_schema_name,
bigquery_dataset_name=config.bigquery_target_dataset_name,
bigquery_max_staleness_seconds=config.bigquery_max_staleness_seconds,
project_id=config.project_id,
)
else:
# Generate CREATE TABLE DDL for dynamic dataset stream and save it to a file
table_creator = DynamicDatasetsCreateTable(
source_type=config.source_type,
discover_result_path=config.discover_result_filepath,
create_target_table_ddl_filepath=config.create_target_table_ddl_filepath,
source_table_name=config.source_table_name,
source_schema_name=config.source_schema_name,
bigquery_max_staleness_seconds=config.bigquery_max_staleness_seconds,
project_id=config.project_id,
bigquery_region=config.bigquery_region,
bigquery_kms_key_name=config.bigquery_kms_key_name,
bigquery_dataset_name=config.bigquery_target_dataset_name,
)
table_creator.generate_ddl()
table_id = table_creator.get_fully_qualified_bigquery_table_name()
if (
config.migration_mode == MigrationMode.CREATE_TABLE
or config.migration_mode == MigrationMode.FULL
):
_verify_bigquery_table_not_exist(
table_id=table_id, bigquery_client=bigquery_client
)
_wait_for_user_prompt_if_necessary("Creating BigQuery table", config.force)
# Run DDL on BigQuery
execute_create_table(
filepath=config.create_target_table_ddl_filepath,
bigquery_client=bigquery_client,
)
# Generate SQL statement for fetching source BigQuery table DDL and save it to a file
BigQueryTableDDLFetcher(
project_id=config.project_id,
dataset=config.bigquery_source_dataset_name,
table=config.bigquery_source_table_name,
filepath=config.fetch_bigquery_source_table_ddl_filepath,
).fetch_table_schema()
# Run SQL statement and save the DDL to a file
execute_fetch_bigquery_table_ddl(
sql_filepath=config.fetch_bigquery_source_table_ddl_filepath,
output_path=config.create_source_table_ddl_filepath,
bigquery_client=bigquery_client,
)
# Generate copy rows SQL statement and save it to a file
CopyDataSQLGenerator(
source_bigquery_table_ddl=config.create_source_table_ddl_filepath,
destination_bigquery_table_ddl=config.create_target_table_ddl_filepath,
filepath=config.copy_rows_filepath,
).generate_sql()
if config.migration_mode == MigrationMode.FULL:
_wait_for_user_prompt_if_necessary(
"Copying rows from"
f" {config.project_id}.{config.bigquery_source_dataset_name}.{config.bigquery_source_table_name} to"
f" {table_id}",
config.force,
)
# Run SQL statement to copy rows
execute_copy_rows(
config.copy_rows_filepath, bigquery_client=bigquery_client
)
if config.migration_mode == MigrationMode.DRY_RUN:
logger.info(
"Dry run finished successfully.\nGenerated `CREATE TABLE` DDL at"
f" '{config.create_target_table_ddl_filepath}'.\nGenerated copy rows"
f" SQL at '{config.copy_rows_filepath}'."
)
elif config.migration_mode == MigrationMode.CREATE_TABLE:
logger.info(
"Table created successfully.\n"
f"New table name is `{table_id}`.\n"
f"Generated copy rows SQL at '{config.copy_rows_filepath}'."
)
else:
logger.info(
f"Migration finished successfully. New table name is `{table_id}`"
)