in migration_toolkit/migration_config.py [0:0]
def _get_filepaths(args):
_, connection_profile_simple_name = args["connection_profile_name"].split(
"/connectionProfiles/"
)
args["discover_result_filepath"] = os.path.join(
DATASTREAM_DISCOVER_RESULT_DIRECTORY,
DATASTREAM_DISCOVER_RESULT_FILENAME_TEMPLATE.format(
connection_profile_name=connection_profile_simple_name,
schema_name=args["source_schema_name"],
table_name=args["source_table_name"],
),
)
bigquery_target_table_fully_qualified_name = f"{args['project_id']}.{args['bigquery_target_dataset_name']}.{args['bigquery_target_table_name']}"
bigquery_source_table_fully_qualified_name = f"{args['project_id']}.{args['bigquery_source_dataset_name']}.{args['bigquery_source_table_name']}"
args["fetch_bigquery_source_table_ddl_filepath"] = os.path.join(
FETCH_BIGQUERY_TABLE_DDL_DIRECTORY,
FETCH_BIGQUERY_TABLE_DDL_FILENAME_TEMPLATE.format(
table_name=bigquery_source_table_fully_qualified_name
),
)
args["create_target_table_ddl_filepath"] = os.path.join(
CREATE_TARGET_TABLE_DDL_DIRECTORY,
CREATE_TARGET_TABLE_DDL_FILENAME_TEMPLATE.format(
table_name=bigquery_target_table_fully_qualified_name
),
)
args["create_source_table_ddl_filepath"] = os.path.join(
SOURCE_TABLE_DDL_DIRECTORY,
SOURCE_TABLE_DDL_FILENAME_TEMPLATE.format(
table_name=bigquery_source_table_fully_qualified_name
),
)
args["copy_rows_filepath"] = os.path.join(
COPY_ROWS_DIRECTORY,
COPY_ROWS_FILENAME_TEMPLATE.format(
source_table=bigquery_source_table_fully_qualified_name,
destination_table=bigquery_target_table_fully_qualified_name,
),
)