pathology/transformation_pipeline/ingestion_lib/pubsub_util.py (31 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Utility functions for pub/sub.""" import functools from google.api_core import exceptions as google_exceptions from google.cloud import pubsub_v1 from pathology.shared_libs.logging_lib import cloud_logging_client from pathology.transformation_pipeline.ingestion_lib import ingest_const @functools.cache def validate_topic_exists(topic_name: str) -> None: """Validates pub/sub topic is defined. Args: topic_name: Name of pub/sub topic. Raises: google_exceptions.NotFound: Pub/sub topic sub is not defined. google_exceptions.PermissionDenied: Permission denied accessing topic. """ with pubsub_v1.PublisherClient() as publisher: try: publisher.get_topic(topic=topic_name) except google_exceptions.NotFound as exp: cloud_logging_client.critical( 'Pub/sub topic is not defined.', {ingest_const.LogKeywords.PUBSUB_TOPIC_NAME: topic_name}, exp, ) raise except google_exceptions.PermissionDenied as exp: cloud_logging_client.critical( 'Permission denied accessing pub/sub topic.', {ingest_const.LogKeywords.PUBSUB_TOPIC_NAME: topic_name}, exp, ) raise except Exception as exp: cloud_logging_client.critical( 'Unexpected exception accessing pub/sub topic.', {ingest_const.LogKeywords.PUBSUB_TOPIC_NAME: topic_name}, exp, ) raise