connectors/sharepoint/sharepoint_deleted_files_purger.py (216 lines of code) (raw):

import logging import os import asyncio import aiohttp from collections import defaultdict from tools import KeyVaultClient from tools import AISearchClient from typing import Any, Dict, List, Optional class SharepointDeletedFilesPurger: def __init__(self): # Initialize configuration from environment variables self.connector_enabled = os.getenv("SHAREPOINT_CONNECTOR_ENABLED", "false").lower() == "true" self.tenant_id = os.getenv("SHAREPOINT_TENANT_ID") self.client_id = os.getenv("SHAREPOINT_CLIENT_ID") self.client_secret_name = os.getenv("SHAREPOINT_CLIENT_SECRET_NAME", "sharepointClientSecret") self.index_name = os.getenv("AZURE_SEARCH_SHAREPOINT_INDEX_NAME", "ragindex") self.site_domain = os.getenv("SHAREPOINT_SITE_DOMAIN") self.site_name = os.getenv("SHAREPOINT_SITE_NAME") self.keyvault_client: Optional[KeyVaultClient] = None self.client_secret: Optional[str] = None self.search_client: Optional[AISearchClient] = None self.site_id: Optional[str] = None self.access_token: Optional[str] = None async def initialize_clients(self) -> bool: """Initialize KeyVaultClient, retrieve secrets, and initialize AISearchClient.""" # Initialize Key Vault Client and retrieve SharePoint client secret try: self.keyvault_client = KeyVaultClient() self.client_secret = await self.keyvault_client.get_secret(self.client_secret_name) logging.debug("[sharepoint_purge_deleted_files] Retrieved sharepointClientSecret secret from Key Vault.") except Exception as e: logging.error(f"[sharepoint_purge_deleted_files] Failed to retrieve secret from Key Vault: {e}") return False finally: if self.keyvault_client: await self.keyvault_client.close() # Check for missing environment variables required_vars = { "SHAREPOINT_TENANT_ID": self.tenant_id, "SHAREPOINT_CLIENT_ID": self.client_id, "SHAREPOINT_SITE_DOMAIN": self.site_domain, "SHAREPOINT_SITE_NAME": self.site_name, "AZURE_SEARCH_SHAREPOINT_INDEX_NAME": self.index_name } missing_env_vars = [var for var, value in required_vars.items() if not value] if missing_env_vars: logging.error( f"[sharepoint_purge_deleted_files] Missing environment variables: {', '.join(missing_env_vars)}. " "Please set all required environment variables." ) return False if not self.client_secret: logging.error( "[sharepoint_purge_deleted_files] SharePoint connector secret is not properly configured. " "Missing secret: sharepointClientSecret. Please set the required secret in Key Vault." ) return False # Initialize AISearchClient try: self.search_client = AISearchClient() logging.debug("[sharepoint_purge_deleted_files] Initialized AISearchClient successfully.") except ValueError as ve: logging.error(f"[sharepoint_purge_deleted_files] AISearchClient initialization failed: {ve}") return False except Exception as e: logging.error(f"[sharepoint_purge_deleted_files] Unexpected error during AISearchClient initialization: {e}") return False return True async def get_graph_access_token(self) -> Optional[str]: """Obtain access token for Microsoft Graph API.""" token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token" headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret, "scope": "https://graph.microsoft.com/.default" } async with aiohttp.ClientSession() as session: try: async with session.post(token_url, headers=headers, data=data) as resp: if resp.status == 200: token_response = await resp.json() access_token = token_response.get("access_token") logging.debug("[sharepoint_purge_deleted_files] Successfully obtained access token for Microsoft Graph API.") return access_token else: error_response = await resp.text() logging.error(f"[sharepoint_purge_deleted_files] Failed to obtain access token: {resp.status} - {error_response}") return None except Exception as e: logging.error(f"[sharepoint_purge_deleted_files] Exception while obtaining access token: {e}") return None async def get_site_id(self) -> Optional[str]: """Retrieve the SharePoint site ID using Microsoft Graph API.""" access_token = await self.get_graph_access_token() if not access_token: return None url = f"https://graph.microsoft.com/v1.0/sites/{self.site_domain}:/sites/{self.site_name}?$select=id" headers = { "Authorization": f"Bearer {access_token}", "Accept": "application/json" } async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=headers) as resp: if resp.status == 200: data = await resp.json() site_id = data.get("id", None) if site_id: logging.info("[sharepoint_purge_deleted_files] Successfully retrieved site ID.") return site_id else: logging.error("[sharepoint_purge_deleted_files] 'id' field not found in site response.") return None else: error_response = await resp.text() logging.error(f"[sharepoint_purge_deleted_files] Failed to retrieve site ID: {resp.status} - {error_response}") return None except Exception as e: logging.error(f"[sharepoint_purge_deleted_files] Exception while retrieving site ID: {e}") return None async def check_parent_id_exists(self, parent_id: Any, headers: Dict[str, str], semaphore: asyncio.Semaphore) -> bool: """Check if a SharePoint parent ID exists.""" check_url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive/items/{parent_id}" async with semaphore: async with aiohttp.ClientSession() as session: try: async with session.get(check_url, headers=headers) as resp: if resp.status == 200: logging.debug(f"[sharepoint_purge_deleted_files] SharePoint ID {parent_id} exists.") return True elif resp.status == 404: logging.debug(f"[sharepoint_purge_deleted_files] SharePoint ID {parent_id} does not exist.") return False else: error_text = await resp.text() logging.error(f"[sharepoint_purge_deleted_files] Error checking SharePoint ID {parent_id}: {resp.status} - {error_text}") return False except Exception as e: logging.error(f"[sharepoint_purge_deleted_files] Exception while checking SharePoint ID {parent_id}: {e}") return False # Assume it doesn't exist if there's an error async def purge_deleted_files(self) -> None: """Main method to purge deleted SharePoint files from Azure Search index.""" logging.info("[sharepoint_purge_deleted_files] Started SharePoint purge connector function.") if not self.connector_enabled: logging.info( "[sharepoint_purge_deleted_files] SharePoint purge connector is disabled. " "Set SHAREPOINT_CONNECTOR_ENABLED to 'true' to enable the connector." ) return # Initialize clients and configurations if not await self.initialize_clients(): return # Obtain the site_id self.site_id = await self.get_site_id() if not self.site_id: logging.error("[sharepoint_purge_deleted_files] Unable to retrieve site_id. Aborting operation.") return # Obtain access token for item checks self.access_token = await self.get_graph_access_token() if not self.access_token: logging.error("[sharepoint_purge_deleted_files] Cannot proceed without access token.") await self.search_client.close() return headers = { "Authorization": f"Bearer {self.access_token}" } # Retrieve all documents with sharepoint_id != null from Azure Search logging.info("[sharepoint_purge_deleted_files] Retrieving documents from Azure Search index.") try: search_results = await self.search_client.search_documents( index_name=self.index_name, search_text="*", filter_str="parent_id ne null and source eq 'sharepoint'", select_fields=["parent_id", "id", "metadata_storage_name"], top=0 ) except Exception as e: logging.error(f"[sharepoint_purge_deleted_files] Failed to retrieve documents from Azure Search: {e}") await self.search_client.close() return documents = search_results.get("documents", []) logging.info(f"[sharepoint_purge_deleted_files] Retrieved {len(documents)} SharePoint document chunks.") if not documents: logging.info("[sharepoint_purge_deleted_files] No document chunks to purge. Exiting function.") await self.search_client.close() return # Map parent_id to a list of document ids sharepoint_to_doc_ids = defaultdict(list) for doc in documents: if "parent_id" in doc and "id" in doc: sharepoint_to_doc_ids[doc["parent_id"]].append(doc["id"]) parent_ids = list(sharepoint_to_doc_ids.keys()) logging.info(f"[sharepoint_purge_deleted_files] Checking existence of {len(parent_ids)} SharePoint document(s).") semaphore = asyncio.Semaphore(10) # Limit concurrent requests # Create tasks to check if parent IDs exist existence_tasks = [ self.check_parent_id_exists(parent_id, headers, semaphore) for parent_id in parent_ids ] existence_results = await asyncio.gather(*existence_tasks) # Identify all document IDs to delete for non-existing parent_ids doc_ids_to_delete = [] for parent_id, exists in zip(parent_ids, existence_results): if not exists: doc_ids_to_delete.extend(sharepoint_to_doc_ids[parent_id]) logging.info(f"[sharepoint_purge_deleted_files] {len(doc_ids_to_delete)} document chunks identified for purging.") if doc_ids_to_delete: batch_size = 100 for i in range(0, len(doc_ids_to_delete), batch_size): batch = doc_ids_to_delete[i:i + batch_size] try: await self.search_client.delete_documents( index_name=self.index_name, key_field="id", key_values=batch ) logging.info(f"[sharepoint_purge_deleted_files] Purging batch of {len(batch)} documents from Azure Search.") except Exception as e: logging.error(f"[sharepoint_purge_deleted_files] Failed to purge batch starting at index {i}: {e}") else: logging.info("[sharepoint_purge_deleted_files] No documents to purge.") # Close the AISearchClient try: await self.search_client.close() logging.debug("[sharepoint_purge_deleted_files] Closed AISearchClient successfully.") except Exception as e: logging.error(f"[sharepoint_purge_deleted_files] Failed to close AISearchClient: {e}") logging.info("[sharepoint_purge_deleted_files] Completed SharePoint purge connector function.") async def run(self) -> None: """Run the purge process.""" await self.purge_deleted_files() # Example usage # To run the purge process, you would typically do the following in an async context: # import asyncio # # if __name__ == "__main__": # purger = SharepointDeletedFilesPurger() # asyncio.run(purger.run())