connectors/sharepoint/sharepoint_files_indexer.py (195 lines of code) (raw):
import logging
import os
import asyncio
from connectors import SharePointDataReader
from tools import KeyVaultClient
from tools import AISearchClient
from typing import Any, Dict, List, Optional
from chunking import DocumentChunker
from chunking import ChunkerFactory
class SharepointFilesIndexer:
def __init__(self):
# Initialize configuration from environment variables
self.connector_enabled = os.getenv("SHAREPOINT_CONNECTOR_ENABLED", "false").lower() == "true"
self.tenant_id = os.getenv("SHAREPOINT_TENANT_ID")
self.client_id = os.getenv("SHAREPOINT_CLIENT_ID")
self.site_domain = os.getenv("SHAREPOINT_SITE_DOMAIN")
self.site_name = os.getenv("SHAREPOINT_SITE_NAME")
self.folder_path = os.getenv("SHAREPOINT_SITE_FOLDER", "/")
self.sharepoint_client_secret_name = os.getenv("SHAREPOINT_CLIENT_SECRET_NAME", "sharepointClientSecret")
self.index_name = os.getenv("AZURE_SEARCH_SHAREPOINT_INDEX_NAME", "ragindex")
self.file_formats = os.getenv("SHAREPOINT_FILES_FORMAT")
if self.file_formats:
# Convert comma-separated string into a list, trimming whitespace
self.file_formats = [fmt.strip() for fmt in self.file_formats.split(",")]
else:
supported_extensions = ChunkerFactory.get_supported_extensions()
self.file_formats = [fmt.strip() for fmt in supported_extensions.split(",")]
self.keyvault_client: Optional[KeyVaultClient] = None
self.client_secret: Optional[str] = None
self.sharepoint_data_reader: Optional[SharePointDataReader] = None
self.search_client: Optional[AISearchClient] = None
async def initialize_clients(self) -> bool:
"""Initialize KeyVaultClient, retrieve secrets, and initialize SharePointDataReader and AISearchClient."""
# Initialize Key Vault Client and retrieve SharePoint client secret
try:
self.keyvault_client = KeyVaultClient()
self.client_secret = await self.keyvault_client.get_secret(self.sharepoint_client_secret_name)
logging.debug("[sharepoint_files_indexer] Retrieved sharepointClientSecret secret from Key Vault.")
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Failed to retrieve secret from Key Vault: {e}")
return False
finally:
if self.keyvault_client:
await self.keyvault_client.close()
# Check for missing environment variables
required_vars = {
"SHAREPOINT_TENANT_ID": self.tenant_id,
"SHAREPOINT_CLIENT_ID": self.client_id,
"SHAREPOINT_SITE_DOMAIN": self.site_domain,
"SHAREPOINT_SITE_NAME": self.site_name,
}
missing_env_vars = [var for var, value in required_vars.items() if not value]
if missing_env_vars:
logging.error(
f"[sharepoint_files_indexer] Missing environment variables: {', '.join(missing_env_vars)}. "
"Please set all required environment variables."
)
return False
if not self.client_secret:
logging.error(
"[sharepoint_files_indexer] SharePoint connector secret is not properly configured. "
"Missing secret: sharepointClientSecret. Please set the required secret in Key Vault."
)
return False
# Initialize SharePointDataReader
try:
self.sharepoint_data_reader = SharePointDataReader(
tenant_id=self.tenant_id,
client_id=self.client_id,
client_secret=self.client_secret,
)
self.sharepoint_data_reader._msgraph_auth()
logging.debug("[sharepoint_files_indexer] Authenticated with Microsoft Graph successfully.")
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Authentication failed: {e}")
return False
# Initialize AISearchClient
try:
self.search_client = AISearchClient()
logging.debug("[sharepoint_files_indexer] Initialized AISearchClient successfully.")
except ValueError as ve:
logging.error(f"[sharepoint_files_indexer] AISearchClient initialization failed: {ve}")
return False
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Unexpected error during AISearchClient initialization: {e}")
return False
return True
async def delete_existing_chunks(self, existing_chunks: Dict[str, Any], file_name: str) -> None:
"""Delete existing document chunks from the search index."""
chunk_ids = [doc['id'] for doc in existing_chunks.get('documents', []) if 'id' in doc]
if not chunk_ids:
logging.warning(f"[sharepoint_files_indexer] No valid 'id's found for existing chunks of '{file_name}'. Skipping deletion.")
return
try:
await self.search_client.delete_documents(index_name=self.index_name, key_field="id", key_values=chunk_ids)
logging.debug(f"[sharepoint_files_indexer] Deleted {len(chunk_ids)} existing chunks for '{file_name}'.")
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Failed to delete existing chunks for '{file_name}': {e}")
async def index_file(self, data: Dict[str, Any]) -> None:
"""Index a single file's metadata into the search index."""
try:
await self.search_client.index_document(index_name=self.index_name, document=data)
logging.debug(f"[sharepoint_files_indexer] Indexed file '{data['fileName']}' successfully.")
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Failed to index file '{data['fileName']}': {e}")
async def process_file(self, file: Dict[str, Any], semaphore: asyncio.Semaphore) -> None:
"""Process and index a single SharePoint file."""
async with semaphore:
file_name = file.get("name")
if not file_name:
logging.warning("[sharepoint_files_indexer] File name is missing. Skipping file.")
return
sharepoint_id = file.get("id")
document_bytes = file.get("content")
document_url = file.get("source")
last_modified_datetime = file.get("last_modified_datetime")
read_access_entity = file.get("read_access_entity")
logging.info(f"[sharepoint_files_indexer] Processing File: {file_name}. Last Modified: {last_modified_datetime}")
data = {
"sharepointId": sharepoint_id,
"fileName": file_name,
"documentBytes": document_bytes,
"documentUrl": document_url
}
# Fetch existing chunks related to the file
try:
existing_chunks = await self.search_client.search_documents(
index_name=self.index_name,
search_text="*",
filter_str=f"parent_id eq '{sharepoint_id}' and source eq 'sharepoint'",
select_fields=['id', 'metadata_storage_last_modified', 'metadata_storage_name'],
top=0
)
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Failed to search existing chunks for '{file_name}': {e}")
return
if existing_chunks.get('count', 0) == 0:
logging.debug(f"[sharepoint_files_indexer] No existing chunks found for '{file_name}'. Proceeding to index.")
else:
indexed_last_modified_str = existing_chunks['documents'][0].get('metadata_storage_last_modified')
if not indexed_last_modified_str:
logging.warning(
f"[sharepoint_files_indexer] 'metadata_storage_last_modified' not found for existing chunks of '{file_name}'. "
"Deleting existing chunks and proceeding to re-index."
)
await self.delete_existing_chunks(existing_chunks, file_name)
else:
# Compare modification times
if last_modified_datetime <= indexed_last_modified_str:
logging.info(f"[sharepoint_files_indexer] '{file_name}' has not been modified since last indexing. Skipping.")
return # Skip indexing as no changes detected
else:
# If the file has been modified, delete existing chunks and re-index
logging.debug(f"[sharepoint_files_indexer] '{file_name}' has been modified. Deleting existing chunks and re-indexing.")
await self.delete_existing_chunks(existing_chunks, file_name)
# Chunk and index document
chunks, errors, warnings = DocumentChunker().chunk_documents(data)
if warnings:
for warning in warnings:
logging.warning(f"[sharepoint_files_indexer] Warning when chunking {file_name}: {warning.get('message', 'No message')}")
if errors:
for error in errors:
logging.error(f"[sharepoint_files_indexer] Skipping {file_name}. Error: {error.get('message', 'No message')}")
return # Skip this file
# Ingest the chunks into the index
for chunk in chunks:
chunk["id"] = f"{sharepoint_id}_{chunk.get('chunk_id', 'unknown')}"
chunk["parent_id"] = sharepoint_id
chunk["metadata_storage_path"] = document_url
chunk["metadata_storage_name"] = file_name
chunk["metadata_storage_last_modified"] = last_modified_datetime
chunk["metadata_security_id"] = read_access_entity
chunk["source"] = "sharepoint"
try:
await self.search_client.index_document(self.index_name, chunk)
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Failed to index chunk for '{file_name}': {e}")
logging.info(f"[sharepoint_files_indexer] Indexed {file_name} chunks.")
async def run(self) -> None:
"""Main method to run the SharePoint files indexing process."""
logging.info("[sharepoint_files_indexer] Started sharepoint files index run.")
if not self.connector_enabled:
logging.info("[sharepoint_files_indexer] SharePoint connector is disabled. Set SHAREPOINT_CONNECTOR_ENABLED to 'true' to enable the connector.")
return
# Initialize clients and configurations
if not await self.initialize_clients():
return
# Retrieve SharePoint files content
try:
files = self.sharepoint_data_reader.retrieve_sharepoint_files_content(
site_domain=self.site_domain,
site_name=self.site_name,
folder_path=self.folder_path,
file_formats=self.file_formats,
)
number_files = len(files) if files else 0
logging.info(f"[sharepoint_files_indexer] Retrieved {number_files} files from SharePoint.")
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Failed to retrieve files. Check your sharepoint configuration environment variables. Error: {e}")
return
if not files:
logging.info("[sharepoint_files_indexer] No files retrieved from SharePoint.")
await self.search_client.close()
return
semaphore = asyncio.Semaphore(10) # Limit concurrent file processing
# Create tasks to process all files in parallel
tasks = [self.process_file(file, semaphore) for file in files]
await asyncio.gather(*tasks)
# Close the AISearchClient
try:
await self.search_client.close()
logging.debug("[sharepoint_files_indexer] Closed AISearchClient successfully.")
except Exception as e:
logging.error(f"[sharepoint_files_indexer] Failed to close AISearchClient: {e}")
logging.info("[sharepoint_files_indexer] SharePoint connector finished.")
# Example usage
# To run the indexer, you would typically do the following in an async context:
# import asyncio
#
# if __name__ == "__main__":
# indexer = SharepointFilesIndexer()
# asyncio.run(indexer.run())