connectors/images_deleted_files_purger.py (55 lines of code) (raw):

import os import logging from tools import BlobContainerClient from tools import AISearchClient class ImagesDeletedFilesPurger: """ Identifies images in the 'documents-images' container that are no longer referenced in the Azure AI Search index (via 'relatedImages'). If an image is not present in any document's 'relatedImages', it is deleted. """ def __init__(self): """ Initialize with environment variables and any other configuration. """ self.index_name = os.getenv("AZURE_SEARCH_INDEX_NAME", "ragindex") self.container_name = os.getenv("STORAGE_CONTAINER_IMAGES", "documents-images") self.storage_account_name = os.getenv("STORAGE_ACCOUNT_NAME") self.blob_base_url = f"https://{self.storage_account_name}.blob.core.windows.net" # Warn if some env vars are missing if not all([self.index_name, self.container_name, self.storage_account_name]): logging.warning("[images_deleted_files_purger] Missing or incomplete environment variables.") # We'll create a single AISearchClient for searching the index self.ai_search = AISearchClient() async def run(self): """ Executes the purge process (asynchronously): 1) Gathers all referenced images from the 'relatedImages' field in the search index. 2) Lists all blobs in the container. 3) Deletes those not referenced in step 1. """ logging.info("[images_deleted_files_purger] Starting images from deleted files purging run()") # 1. Collect all referenced images from Azure AI Search referenced_images = await self._get_all_referenced_images() # 2. Purge unreferenced images from the container await self._purge_unreferenced_images(referenced_images) logging.info("[images_deleted_files_purger] Completed run().") # Optionally close the AISearchClient await self.ai_search.close() async def _get_all_referenced_images(self) -> set: """ Uses AISearchClient to retrieve 'relatedImages' from all docs in the index and returns a set of URLs. """ logging.info("[images_deleted_files_purger] Retrieving referenced images from AI Search index...") referenced_images = set() try: # We'll search with wildcard '*' and ask for only 'relatedImages' field # top=0 or top=1000 or more, depending on how many docs you expect # If you have more than 1,000 docs with images, you may want to paginate or set top=0 to gather them all. # For a large index, you'd do multiple calls in a loop. This is a basic example: results = await self.ai_search.search_documents( index_name=self.index_name, search_text="*", select_fields=["relatedImages"], top=1000 ) # results is a dict with: { "count": int, "documents": [...], "error": optional error } if "documents" in results and isinstance(results["documents"], list): for doc in results["documents"]: # doc might be a SearchDocument or dict. If dict, just do doc["relatedImages"] directly. images = doc.get("relatedImages") or [] if isinstance(images, list): for img_url in images: if img_url and isinstance(img_url, str): referenced_images.add(img_url.strip()) if "error" in results and results["error"]: logging.error(f"[images_deleted_files_purger] Error from AISearchClient: {results['error']}") except Exception as e: logging.error(f"[images_deleted_files_purger] Error retrieving referenced images: {e}") logging.info(f"[images_deleted_files_purger] Found {len(referenced_images)} referenced images.") return referenced_images async def _purge_unreferenced_images(self, referenced_images: set): """ Lists all blobs in the container. For each blob, if its URL isn't in 'referenced_images', delete it. """ logging.info("[images_deleted_files_purger] Starting purge of unreferenced images...") try: container_client = BlobContainerClient(self.blob_base_url, self.container_name) # Because container_client.list_blobs() is synchronous for the azure-storage-blob library, # we can iterate directly. If you want, you can gather them first in a list. blob_list = container_client.list_blobs() for blob in blob_list: blob_url = f"{self.blob_base_url}/{self.container_name}/{blob}" if blob_url not in referenced_images: logging.info(f"[images_deleted_files_purger] Deleting unreferenced blob: {blob_url}") container_client.delete_blob(blob) except Exception as e: logging.error(f"[images_deleted_files_purger] Error purging images: {e}") logging.info("[images_deleted_files_purger] Purge process finished.")