def main()

in migration_toolkit/migrate_table.py [0:0]


def main():
  config: argparse.Namespace = get_config()
  logger.debug(f"Using config {vars(config)}")

  _add_stream_label(
      stream=config.stream,
      datastream_api_endpoint_override=config.datastream_api_endpoint_override,
  )

  # Run Datastream's discover on connection profile and save response to a file
  execute_discover(
      connection_profile_name=config.connection_profile_name,
      source_schema_name=config.source_schema_name,
      source_table_name=config.source_table_name,
      source_type=config.source_type,
      datastream_api_endpoint_override=config.datastream_api_endpoint_override,
      filepath=config.discover_result_filepath,
  )

  bigquery_client = bigquery.Client(
      client_info=ClientInfo(user_agent=USER_AGENT)
  )

  if config.single_target_stream:
    # Generate CREATE TABLE DDL for single dataset stream and save it to a file
    table_creator = SingleDatasetCreateTable(
        source_type=config.source_type,
        discover_result_path=config.discover_result_filepath,
        create_target_table_ddl_filepath=config.create_target_table_ddl_filepath,
        source_table_name=config.source_table_name,
        source_schema_name=config.source_schema_name,
        bigquery_dataset_name=config.bigquery_target_dataset_name,
        bigquery_max_staleness_seconds=config.bigquery_max_staleness_seconds,
        project_id=config.project_id,
    )
  else:
    # Generate CREATE TABLE DDL for dynamic dataset stream and save it to a file
    table_creator = DynamicDatasetsCreateTable(
        source_type=config.source_type,
        discover_result_path=config.discover_result_filepath,
        create_target_table_ddl_filepath=config.create_target_table_ddl_filepath,
        source_table_name=config.source_table_name,
        source_schema_name=config.source_schema_name,
        bigquery_max_staleness_seconds=config.bigquery_max_staleness_seconds,
        project_id=config.project_id,
        bigquery_region=config.bigquery_region,
        bigquery_kms_key_name=config.bigquery_kms_key_name,
        bigquery_dataset_name=config.bigquery_target_dataset_name,
    )

  table_creator.generate_ddl()
  table_id = table_creator.get_fully_qualified_bigquery_table_name()

  if (
      config.migration_mode == MigrationMode.CREATE_TABLE
      or config.migration_mode == MigrationMode.FULL
  ):
    _verify_bigquery_table_not_exist(
        table_id=table_id, bigquery_client=bigquery_client
    )

    _wait_for_user_prompt_if_necessary("Creating BigQuery table", config.force)
    # Run DDL on BigQuery
    execute_create_table(
        filepath=config.create_target_table_ddl_filepath,
        bigquery_client=bigquery_client,
    )

  # Generate SQL statement for fetching source BigQuery table DDL and save it to a file
  BigQueryTableDDLFetcher(
      project_id=config.project_id,
      dataset=config.bigquery_source_dataset_name,
      table=config.bigquery_source_table_name,
      filepath=config.fetch_bigquery_source_table_ddl_filepath,
  ).fetch_table_schema()

  # Run SQL statement and save the DDL to a file
  execute_fetch_bigquery_table_ddl(
      sql_filepath=config.fetch_bigquery_source_table_ddl_filepath,
      output_path=config.create_source_table_ddl_filepath,
      bigquery_client=bigquery_client,
  )

  # Generate copy rows SQL statement and save it to a file
  CopyDataSQLGenerator(
      source_bigquery_table_ddl=config.create_source_table_ddl_filepath,
      destination_bigquery_table_ddl=config.create_target_table_ddl_filepath,
      filepath=config.copy_rows_filepath,
  ).generate_sql()

  if config.migration_mode == MigrationMode.FULL:
    _wait_for_user_prompt_if_necessary(
        "Copying rows from"
        f" {config.project_id}.{config.bigquery_source_dataset_name}.{config.bigquery_source_table_name} to"
        f" {table_id}",
        config.force,
    )

    # Run SQL statement to copy rows
    execute_copy_rows(
        config.copy_rows_filepath, bigquery_client=bigquery_client
    )

  if config.migration_mode == MigrationMode.DRY_RUN:
    logger.info(
        "Dry run finished successfully.\nGenerated `CREATE TABLE` DDL at"
        f" '{config.create_target_table_ddl_filepath}'.\nGenerated copy rows"
        f" SQL at '{config.copy_rows_filepath}'."
    )
  elif config.migration_mode == MigrationMode.CREATE_TABLE:
    logger.info(
        "Table created successfully.\n"
        f"New table name is `{table_id}`.\n"
        f"Generated copy rows SQL at '{config.copy_rows_filepath}'."
    )
  else:
    logger.info(
        f"Migration finished successfully. New table name is `{table_id}`"
    )