backend-printing/helper/storage.py (74 lines of code) (raw):

"""Storage client to interact with the Azure Storage Account.""" import os from uuid import uuid4 import json from azure.storage.queue import QueueMessage from helper.azure_client import AzureClient from helper.constants import MAX_ITEMS_TO_FETCH, MESSAGE_EXPIRY_TIME class StorageQueueClient(AzureClient): def _store_document_to_blob(self, blob_guid, message): """Store the document to the blob Args: blob_guid (string): container guid document (dict): document to store """ try: blob_client = self.get_blob_client(blob_guid) blob_client.upload_blob(data=json.dumps(message).encode("utf-8")) return blob_client.url except Exception as e: raise e def _get_document_from_blob(self, message_content): """Get the document from the blob Args: message_content (dict): blob guid and url """ try: blob_url, blob_guid = ( message_content["blob_url"], message_content["blob_guid"], ) blob_client = self.get_blob_client(blob_guid) blob_client.from_blob_url(blob_url) return blob_client.download_blob().readall() except Exception as e: raise e def send_message(self, message): """Send a message to the queue Args: message (string): message to send """ try: encoded_message = json.dumps(message).encode("utf-8") # if message size is greater than 60KB, store it in blob # and send the blob url in the message if len(encoded_message) > 60 * 1024: blob_guid = str(uuid4()) blob_url = self._store_document_to_blob(blob_guid, message) encoded_message = json.dumps( {"blob_guid": blob_guid, "blob_url": blob_url} ).encode("utf-8") send_message_response = self.storage_queue_client.send_message( content=encoded_message, time_to_live=MESSAGE_EXPIRY_TIME, ) return send_message_response except Exception as e: raise Exception(f"Error occurred while sending message: {e}") def receive_messages(self) -> list[dict]: """Receive messages from the queue Returns: list[QueueMessage]: list of messages """ return_messages = [] try: raw_messages = self.storage_queue_client.receive_messages( max_messages=MAX_ITEMS_TO_FETCH ) for messages in raw_messages.by_page(): for message in messages: message_content = json.loads( message.content.decode("utf-8").replace("'", '"') ) if "blob_guid" in message_content: blob_data = self._get_document_from_blob(message_content) message_content = json.loads( blob_data.decode("utf-8").replace("'", '"') ) return_messages.append((message, message_content)) return return_messages except Exception as e: raise Exception(f"Error occurred while receiving messages: {e}") def delete_message(self, message): """Delete a message from the queue Args: message (QueueMessage): message to delete """ try: return self.storage_queue_client.delete_message( pop_receipt=message.pop_receipt, message=message ) except Exception as e: raise Exception(f"Error occurred while deleting message: {e}") class TableStorageClient(AzureClient): def put_entity(self, table_name, entity): """Put an entity to the table Args: table_name (string): table name entity (dict): entity to put """ try: return self.table_service_client.upsert_entity(entity=entity) except Exception as e: raise Exception(f"Error occurred while putting entity: {e}")