migration_toolkit/executors/get_stream.py (42 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import sys from common.monitoring_consts import USER_AGENT from google.api_core.exceptions import NotFound from google.api_core.gapic_v1.client_info import ClientInfo from google.cloud import datastream_v1 from google.cloud.datastream_v1.types import Stream logger = logging.getLogger(__name__) def _pb_to_json(pb): return json.loads(type(pb).to_json(pb)) def execute_get_stream( project_id: str, datastream_region: str, stream_id: str, datastream_api_endpoint_override: str, ) -> Stream: client_options = ( {"api_endpoint": datastream_api_endpoint_override} if datastream_api_endpoint_override else {} ) logger.info( f"Calling get on stream '{stream_id}', region '{datastream_region}'," f" project '{project_id}', client options '{client_options}'.." ) client = datastream_v1.DatastreamClient( client_options=client_options, client_info=ClientInfo(user_agent=USER_AGENT), ) parent = f"projects/{project_id}/locations/{datastream_region}" fully_qualified_stream_id = f"{parent}/streams/{stream_id}" request = datastream_v1.GetStreamRequest(name=fully_qualified_stream_id) try: stream: Stream = client.get_stream(request=request) except NotFound: logger.error( f"ERROR: Stream '{stream_id}' not found. Make sure the stream exists" " before starting the migration." ) sys.exit(1) return stream