migration_toolkit/migration_config.py (161 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from argparse import RawTextHelpFormatter
import json
import logging
import sys
from common import argparse_arguments
from common import name_mapper
from common.logging_config import configure_logging
from common.output_names import *
from common.source_type import SourceType
from executors.get_stream import execute_get_stream
from google.cloud.datastream_v1.types import Stream
logger = logging.getLogger(__name__)
def get_config() -> argparse.Namespace:
user_args = _get_user_args()
configure_logging(user_args.verbose)
stream: Stream = execute_get_stream(
project_id=user_args.project_id,
datastream_region=user_args.datastream_region,
stream_id=user_args.stream_id,
datastream_api_endpoint_override=user_args.datastream_api_endpoint_override,
)
args_from_stream = _get_args_from_stream(stream=stream, user_args=user_args)
all_args = vars(user_args) | args_from_stream
_get_filepaths(all_args)
return argparse.Namespace(**all_args)
def _get_user_args():
parser = argparse.ArgumentParser(
description="Datastream BigQuery Migration Toolkit arguments",
formatter_class=RawTextHelpFormatter,
)
argparse_arguments.migration_mode(parser)
argparse_arguments.force(parser)
argparse_arguments.verbose(parser)
argparse_arguments.datastream_api_endpoint_override(parser)
required_args_parser = parser.add_argument_group("required arguments")
argparse_arguments.project_id(required_args_parser)
argparse_arguments.stream_id(required_args_parser)
argparse_arguments.datastream_region(required_args_parser)
argparse_arguments.source_schema_name(required_args_parser)
argparse_arguments.source_table_name(required_args_parser)
argparse_arguments.bigquery_source_dataset_name(required_args_parser)
argparse_arguments.bigquery_source_table_name(required_args_parser)
return parser.parse_args()
def _get_args_from_stream(stream: Stream, user_args):
args_from_stream = {}
stream_name = stream.display_name
if stream.state != Stream.State.PAUSED:
logger.error(
f"ERROR: Stream '{stream_name}' should be in state PAUSED, but it is in"
f" state {stream.state.name}. Please pause the stream and run the"
" migration again."
)
sys.exit(1)
args_from_stream["stream"] = stream
args_from_stream["connection_profile_name"] = (
stream.source_config.source_connection_profile
)
args_from_stream["source_type"] = _source_config_to_source_type(
stream.source_config
)
if not hasattr(stream.destination_config, "bigquery_destination_config"):
logger.error(
f"ERROR: Stream '{stream_name}' doesn't have BigQuery destination."
" Recreate the stream with BigQuery destination and run the migration"
" again."
)
sys.exit(1)
args_from_stream["bigquery_max_staleness_seconds"] = (
stream.destination_config.bigquery_destination_config.data_freshness.seconds
)
if hasattr(
stream.destination_config.bigquery_destination_config,
"source_hierarchy_datasets",
):
logger.debug(f"Stream {stream_name} is a source hierarchy stream.")
args_from_stream["bigquery_region"] = getattr(
stream.destination_config.bigquery_destination_config.source_hierarchy_datasets.dataset_template,
"location",
None,
)
dataset_id_prefix = getattr(
stream.destination_config.bigquery_destination_config.source_hierarchy_datasets.dataset_template,
"dataset_id_prefix",
None,
)
args_from_stream["bigquery_kms_key_name"] = getattr(
stream.destination_config.bigquery_destination_config.source_hierarchy_datasets.dataset_template,
"kms_key_name",
None,
)
args_from_stream["bigquery_target_dataset_name"] = (
name_mapper.dynamic_datasets_dataset_name(
dataset_id_prefix=dataset_id_prefix,
source_schema_name=user_args.source_schema_name,
)
)
args_from_stream["bigquery_target_table_name"] = (
name_mapper.dynamic_datasets_table_name(
source_table_name=user_args.source_table_name
)
)
args_from_stream["single_target_stream"] = False
else:
args_from_stream["bigquery_target_dataset_name"] = (
stream.destination_config.bigquery_destination_config.single_target_dataset.dataset_id
)
args_from_stream["bigquery_target_table_name"] = (
name_mapper.single_dataset_table_name(
source_schema_name=user_args.source_schema_name,
source_table_name=user_args.source_table_name,
)
)
args_from_stream["single_target_stream"] = True
return args_from_stream
def _source_config_to_source_type(source_config):
source_config_json = json.loads(type(source_config).to_json(source_config))
return (
SourceType.MYSQL
if source_config_json.get("mysqlSourceConfig") is not None
else SourceType.ORACLE
)
def _get_filepaths(args):
_, connection_profile_simple_name = args["connection_profile_name"].split(
"/connectionProfiles/"
)
args["discover_result_filepath"] = os.path.join(
DATASTREAM_DISCOVER_RESULT_DIRECTORY,
DATASTREAM_DISCOVER_RESULT_FILENAME_TEMPLATE.format(
connection_profile_name=connection_profile_simple_name,
schema_name=args["source_schema_name"],
table_name=args["source_table_name"],
),
)
bigquery_target_table_fully_qualified_name = f"{args['project_id']}.{args['bigquery_target_dataset_name']}.{args['bigquery_target_table_name']}"
bigquery_source_table_fully_qualified_name = f"{args['project_id']}.{args['bigquery_source_dataset_name']}.{args['bigquery_source_table_name']}"
args["fetch_bigquery_source_table_ddl_filepath"] = os.path.join(
FETCH_BIGQUERY_TABLE_DDL_DIRECTORY,
FETCH_BIGQUERY_TABLE_DDL_FILENAME_TEMPLATE.format(
table_name=bigquery_source_table_fully_qualified_name
),
)
args["create_target_table_ddl_filepath"] = os.path.join(
CREATE_TARGET_TABLE_DDL_DIRECTORY,
CREATE_TARGET_TABLE_DDL_FILENAME_TEMPLATE.format(
table_name=bigquery_target_table_fully_qualified_name
),
)
args["create_source_table_ddl_filepath"] = os.path.join(
SOURCE_TABLE_DDL_DIRECTORY,
SOURCE_TABLE_DDL_FILENAME_TEMPLATE.format(
table_name=bigquery_source_table_fully_qualified_name
),
)
args["copy_rows_filepath"] = os.path.join(
COPY_ROWS_DIRECTORY,
COPY_ROWS_FILENAME_TEMPLATE.format(
source_table=bigquery_source_table_fully_qualified_name,
destination_table=bigquery_target_table_fully_qualified_name,
),
)