migration_toolkit/executors/update_stream.py (37 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
from common.monitoring_consts import USER_AGENT
from google.api_core.exceptions import NotFound
from google.api_core.gapic_v1.client_info import ClientInfo
from google.cloud import datastream_v1
from google.cloud.datastream_v1.types import Stream
logger = logging.getLogger(__name__)
def execute_update_stream(
stream: Stream,
datastream_api_endpoint_override: str,
) -> Stream:
client_options = (
{"api_endpoint": datastream_api_endpoint_override}
if datastream_api_endpoint_override
else {}
)
logger.info(
f"Calling update on stream '{stream.display_name}', client options"
f" '{client_options}'.."
)
client = datastream_v1.DatastreamClient(
client_options=client_options,
client_info=ClientInfo(user_agent=USER_AGENT),
)
request = datastream_v1.UpdateStreamRequest(stream=stream)
try:
operation = client.update_stream(request=request)
res = operation.result()
except NotFound:
logger.error(
f"ERROR: Stream '{stream.display_name}' not found. Make sure the stream"
" exists before starting the migration."
)
sys.exit(1)
logging.debug(f"Got result {res}")
return res