migration_toolkit/executors/discover.py (72 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging from typing import Dict from common.file_writer import write_json from common.monitoring_consts import USER_AGENT from common.source_type import SourceType from google.api_core.gapic_v1.client_info import ClientInfo from google.cloud import datastream_v1 logger = logging.getLogger(__name__) SOURCE_TYPE_TO_RDBMS: Dict[SourceType, str] = { SourceType.MYSQL: "mysql_rdbms", SourceType.ORACLE: "oracle_rdbms", } SOURCE_TYPE_TO_SCHEMAS: Dict[SourceType, str] = { SourceType.MYSQL: "mysql_databases", SourceType.ORACLE: "oracle_schemas", } SOURCE_TYPE_TO_SCHEMA: Dict[SourceType, str] = { SourceType.MYSQL: "database", SourceType.ORACLE: "schema", } SOURCE_TYPE_TO_TABLES: Dict[SourceType, str] = { SourceType.MYSQL: "mysql_tables", SourceType.ORACLE: "oracle_tables", } def _pb_to_json(pb): return json.loads(type(pb).to_json(pb)) def _build_data_object(source_type: SourceType, schema: str, table: str): return { SOURCE_TYPE_TO_RDBMS[source_type]: { SOURCE_TYPE_TO_SCHEMAS[source_type]: [{ SOURCE_TYPE_TO_SCHEMA[source_type]: schema, SOURCE_TYPE_TO_TABLES[source_type]: [{"table": table}], }] } } def execute_discover( connection_profile_name: str, source_type: SourceType, source_table_name: str, source_schema_name: str, datastream_api_endpoint_override: str, filepath: str, ): logger.info( f"Calling discover on connection profile '{connection_profile_name}'.." ) client_options = ( {"api_endpoint": datastream_api_endpoint_override} if datastream_api_endpoint_override else {} ) client = datastream_v1.DatastreamClient( client_options=client_options, client_info=ClientInfo(user_agent=USER_AGENT), ) parent, connection_profile_simple_name = connection_profile_name.split( "/connectionProfiles/" ) request = datastream_v1.DiscoverConnectionProfileRequest( connection_profile_name=connection_profile_name, full_hierarchy=True, parent=parent, **_build_data_object( source_type=source_type, schema=source_schema_name, table=source_table_name, ), ) resp = client.discover_connection_profile(request=request) resp = _pb_to_json(resp) write_json(filepath=filepath, data=resp)