code/embedding-function/utilities/helpers/azure_blob_storage_client.py (212 lines of code) (raw):

import mimetypes from typing import Optional from datetime import datetime, timedelta from azure.storage.blob import ( BlobServiceClient, generate_blob_sas, generate_container_sas, ContentSettings, UserDelegationKey, ) from azure.core.credentials import AzureNamedKeyCredential from azure.storage.queue import QueueClient, BinaryBase64EncodePolicy import chardet from .env_helper import EnvHelper from azure.identity import DefaultAzureCredential def connection_string(account_name: str, account_key: str): return f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net" def create_queue_client(): env_helper: EnvHelper = EnvHelper() if env_helper.AZURE_AUTH_TYPE == "rbac": return QueueClient( account_url=f"https://{env_helper.AZURE_BLOB_ACCOUNT_NAME}.queue.core.windows.net/", queue_name=env_helper.DOCUMENT_PROCESSING_QUEUE_NAME, credential=DefaultAzureCredential(), message_encode_policy=BinaryBase64EncodePolicy(), ) else: return QueueClient.from_connection_string( conn_str=connection_string( env_helper.AZURE_BLOB_ACCOUNT_NAME, env_helper.AZURE_BLOB_ACCOUNT_KEY ), queue_name=env_helper.DOCUMENT_PROCESSING_QUEUE_NAME, message_encode_policy=BinaryBase64EncodePolicy(), ) class AzureBlobStorageClient: def __init__( self, account_name: Optional[str] = None, account_key: Optional[str] = None, container_name: Optional[str] = None, ): env_helper: EnvHelper = EnvHelper() self.auth_type = env_helper.AZURE_AUTH_TYPE self.account_name = account_name or env_helper.AZURE_BLOB_ACCOUNT_NAME self.container_name = container_name or env_helper.AZURE_BLOB_CONTAINER_NAME self.endpoint = env_helper.AZURE_STORAGE_ACCOUNT_ENDPOINT if self.auth_type == "rbac": self.account_key = None self.blob_service_client = BlobServiceClient( account_url=self.endpoint, credential=DefaultAzureCredential() ) self.user_delegation_key = self.request_user_delegation_key( blob_service_client=self.blob_service_client ) else: self.account_key = account_key or env_helper.AZURE_BLOB_ACCOUNT_KEY self.blob_service_client = BlobServiceClient( self.endpoint, credential=AzureNamedKeyCredential( name=self.account_name, key=self.account_key ), ) self.user_delegation_key = None def request_user_delegation_key( self, blob_service_client: BlobServiceClient ) -> UserDelegationKey: # Get a user delegation key that's valid for 1 day delegation_key_start_time = datetime.utcnow() delegation_key_expiry_time = delegation_key_start_time + timedelta(days=1) user_delegation_key = blob_service_client.get_user_delegation_key( key_start_time=delegation_key_start_time, key_expiry_time=delegation_key_expiry_time, ) return user_delegation_key def file_exists(self, file_name): blob_client = self.blob_service_client.get_blob_client( container=self.container_name, blob=file_name ) return blob_client.exists() def upload_file( self, bytes_data, file_name, content_type: Optional[str] = None, metadata: Optional[dict[str, str]] = None, ): # Create a blob client using the local file name as the name for the blob blob_client = self.blob_service_client.get_blob_client( container=self.container_name, blob=file_name ) content_settings = ContentSettings(content_type=content_type) if content_type is None: content_type = mimetypes.MimeTypes().guess_type(file_name)[0] charset = ( f"; charset={chardet.detect(bytes_data)['encoding']}" if content_type == "text/plain" else "" ) content_type = content_type if content_type is not None else "text/plain" content_settings = ContentSettings(content_type=content_type + charset) # Upload the created file blob_client.upload_blob( bytes_data, overwrite=True, content_settings=content_settings, metadata=metadata, ) # Generate a SAS URL to the blob and return it, if auth_type is rbac, account_key is None, if not, user_delegation_key is None. return ( blob_client.url + "?" + generate_blob_sas( self.account_name, self.container_name, file_name, user_delegation_key=self.user_delegation_key, account_key=self.account_key, permission="r", expiry=datetime.utcnow() + timedelta(hours=3), ) ) def download_file(self, file_name): blob_client = self.blob_service_client.get_blob_client( container=self.container_name, blob=file_name ) return blob_client.download_blob().readall() def delete_file(self, file_name): """ Deletes a file from the Azure Blob Storage container. Args: file_name (str): The name of the file to delete. Returns: None """ blob_client = self.blob_service_client.get_blob_client( container=self.container_name, blob=file_name ) if blob_client.exists(): blob_client.delete_blob() def delete_files(self, files, integrated_vectorization: bool): """ Deletes files from the Azure Blob Storage container. Args: files (list[str]): The names of the files to delete. Returns: None """ for filename, ids in files.items(): if not integrated_vectorization: filename = filename.split("/")[-1] self.delete_file(filename) def get_all_files(self): # Get all files in the container from Azure Blob Storage container_client = self.blob_service_client.get_container_client( self.container_name ) blob_list = container_client.list_blobs(include="metadata") # sas = generate_blob_sas(account_name, container_name, blob.name,account_key=account_key, permission="r", expiry=datetime.utcnow() + timedelta(hours=3)) sas = generate_container_sas( self.account_name, self.container_name, user_delegation_key=self.user_delegation_key, account_key=self.account_key, permission="r", expiry=datetime.utcnow() + timedelta(hours=3), ) files = [] converted_files = {} for blob in blob_list: if not blob.name.startswith("converted/"): files.append( { "filename": blob.name, "converted": ( blob.metadata.get("converted", "false") == "true" if blob.metadata else False ), "embeddings_added": ( blob.metadata.get("embeddings_added", "false") == "true" if blob.metadata else False ), "fullpath": f"{self.endpoint}{self.container_name}/{blob.name}?{sas}", "converted_filename": ( blob.metadata.get("converted_filename", "") if blob.metadata else "" ), "converted_path": "", } ) else: converted_files[blob.name] = ( f"{self.endpoint}{self.container_name}/{blob.name}?{sas}" ) for file in files: converted_filename = file.pop("converted_filename", "") if converted_filename in converted_files: file["converted"] = True file["converted_path"] = converted_files[converted_filename] return files def upsert_blob_metadata(self, file_name, metadata): blob_client = self.blob_service_client.get_blob_client( container=self.container_name, blob=file_name ) # Read metadata from the blob blob_metadata = blob_client.get_blob_properties().metadata # Update metadata blob_metadata.update(metadata) # Add metadata to the blob blob_client.set_blob_metadata(metadata=blob_metadata) def get_container_sas(self): # Generate a SAS URL to the container and return it return "?" + generate_container_sas( account_name=self.account_name, container_name=self.container_name, user_delegation_key=self.user_delegation_key, account_key=self.account_key, permission="r", expiry=datetime.utcnow() + timedelta(days=365 * 5), ) def get_blob_sas(self, file_name): # Generate a SAS URL to the blob and return it return ( f"{self.endpoint}{self.container_name}/{file_name}" + "?" + generate_blob_sas( account_name=self.account_name, container_name=self.container_name, blob_name=file_name, user_delegation_key=self.user_delegation_key, account_key=self.account_key, permission="r", expiry=datetime.utcnow() + timedelta(hours=1), ) )