ingestion-edge/ingestion_edge/util.py (60 lines of code) (raw):

"""Utilities.""" from google.cloud.pubsub_v1.gapic.publisher_client import PublisherClient from google.cloud.pubsub_v1.publisher.exceptions import PublishError from google.cloud.pubsub_v1.types import BatchSettings, PublishRequest, PubsubMessage from typing import List, Optional import asyncio class AsyncioBatch: """Batch for google.cloud.pubsub_v1.PublisherClient in asyncio. Workaround for https://github.com/googleapis/google-cloud-python/issues/7104 """ def __init__( self, client: PublisherClient, topic: str, settings: BatchSettings, autocommit: bool = True, ): """Initialize.""" self.client = client self.topic = topic self.settings = settings self.full = asyncio.Event() self.messages: List[PubsubMessage] = [] # fix https://github.com/googleapis/google-cloud-python/issues/7108 self.size = PublishRequest(topic=topic, messages=[]).ByteSize() # Create a task to commit when full self.result = asyncio.create_task(self.commit()) # If max latency is specified start a task to monitor the batch # and commit when max latency is reached if autocommit and self.settings.max_latency < float("inf"): asyncio.create_task(self.monitor()) async def monitor(self): """Sleep until max latency is reached then set the batch to full.""" await asyncio.sleep(self.settings.max_latency) self.full.set() async def commit(self) -> List[str]: """Publish this batch when full.""" await self.full.wait() if not self.messages: return [] response = await asyncio.get_running_loop().run_in_executor( None, self.client.api.publish, self.topic, self.messages ) if len(response.message_ids) != len(self.messages): raise PublishError("Some messages not successfully published") return response.message_ids def publish(self, message: PubsubMessage) -> Optional[asyncio.Task]: """Asynchronously publish message.""" # check if batch is not full if not self.full.is_set(): # check if batch can accept message index = len(self.messages) new_size = self.size + PublishRequest(messages=[message]).ByteSize() overflow = ( new_size > self.settings.max_bytes or index + 1 > self.settings.max_messages ) if overflow: # fix https://github.com/googleapis/google-cloud-python/issues/7107 if not self.messages: raise ValueError("Message exceeds max bytes") # batch is full because it could not accept message self.full.set() else: # Store message in the batch. self.messages.append(message) self.size = new_size # return a task to await for the message id return asyncio.create_task(self.message_id(index)) return None # the batch cannot accept a message async def message_id(self, index: int) -> str: """Get message id from result by index.""" return (await self.result)[index] class HTTP_STATUS: """HTTP Status Codes for responses.""" OK = 200 BAD_REQUEST = 400 PAYLOAD_TOO_LARGE = 413 REQUEST_HEADER_FIELDS_TOO_LARGE = 431 INSUFFICIENT_STORAGE = 507