in migration_toolkit/migration_config.py [0:0]
def _get_args_from_stream(stream: Stream, user_args):
args_from_stream = {}
stream_name = stream.display_name
if stream.state != Stream.State.PAUSED:
logger.error(
f"ERROR: Stream '{stream_name}' should be in state PAUSED, but it is in"
f" state {stream.state.name}. Please pause the stream and run the"
" migration again."
)
sys.exit(1)
args_from_stream["stream"] = stream
args_from_stream["connection_profile_name"] = (
stream.source_config.source_connection_profile
)
args_from_stream["source_type"] = _source_config_to_source_type(
stream.source_config
)
if not hasattr(stream.destination_config, "bigquery_destination_config"):
logger.error(
f"ERROR: Stream '{stream_name}' doesn't have BigQuery destination."
" Recreate the stream with BigQuery destination and run the migration"
" again."
)
sys.exit(1)
args_from_stream["bigquery_max_staleness_seconds"] = (
stream.destination_config.bigquery_destination_config.data_freshness.seconds
)
if hasattr(
stream.destination_config.bigquery_destination_config,
"source_hierarchy_datasets",
):
logger.debug(f"Stream {stream_name} is a source hierarchy stream.")
args_from_stream["bigquery_region"] = getattr(
stream.destination_config.bigquery_destination_config.source_hierarchy_datasets.dataset_template,
"location",
None,
)
dataset_id_prefix = getattr(
stream.destination_config.bigquery_destination_config.source_hierarchy_datasets.dataset_template,
"dataset_id_prefix",
None,
)
args_from_stream["bigquery_kms_key_name"] = getattr(
stream.destination_config.bigquery_destination_config.source_hierarchy_datasets.dataset_template,
"kms_key_name",
None,
)
args_from_stream["bigquery_target_dataset_name"] = (
name_mapper.dynamic_datasets_dataset_name(
dataset_id_prefix=dataset_id_prefix,
source_schema_name=user_args.source_schema_name,
)
)
args_from_stream["bigquery_target_table_name"] = (
name_mapper.dynamic_datasets_table_name(
source_table_name=user_args.source_table_name
)
)
args_from_stream["single_target_stream"] = False
else:
args_from_stream["bigquery_target_dataset_name"] = (
stream.destination_config.bigquery_destination_config.single_target_dataset.dataset_id
)
args_from_stream["bigquery_target_table_name"] = (
name_mapper.single_dataset_table_name(
source_schema_name=user_args.source_schema_name,
source_table_name=user_args.source_table_name,
)
)
args_from_stream["single_target_stream"] = True
return args_from_stream