connectors/sharepoint/sharepoint_data_reader.py (375 lines of code) (raw):
"""
Copyright (c) 2023 Liam Cavanagh
This code is an adaptation of the original code available at https://github.com/liamca/sharepoint-indexing-azure-cognitive-search, licensed under the MIT License.
"""
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Union, Tuple
import msal
import requests
from dotenv import load_dotenv
import logging
class SharePointDataReader:
"""This class facilitates the extraction of data from SharePoint using Microsoft Graph API.
It supports authentication and data retrieval from SharePoint sites, lists, and libraries.
"""
def __init__(
self,
tenant_id: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
graph_uri: str = "https://graph.microsoft.com",
authority_template: str = "https://login.microsoftonline.com/{tenant_id}",
):
"""
Initialize the SharePointDataExtractor class with optional environment variables.
:param tenant_id: Tenant ID for Microsoft 365.
:param client_id: Client ID for the application registered in Azure AD.
:param client_secret: Client secret for the application registered in Azure AD.
:param graph_uri: URI for Microsoft Graph API.
:param authority_template: Template for authority URL used in authentication.
"""
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.graph_uri = graph_uri
self.authority = (
authority_template.format(tenant_id=tenant_id) if tenant_id else None
)
self.scope = ["https://graph.microsoft.com/.default"]
self.access_token = None
def retrieve_sharepoint_files_content(
self,
site_domain: str,
site_name: str,
folder_path: Optional[str] = None,
file_names: Optional[Union[str, List[str]]] = None,
minutes_ago: Optional[int] = None,
file_formats: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""
Retrieve contents of files from a specified SharePoint location, optionally filtering by last modification time and file formats.
:param site_domain: The domain of the site in Microsoft Graph.
:param site_name: The name of the site in Microsoft Graph.
:param folder_path: Path to the folder within the drive, can include subfolders like 'test1/test2'.
:param file_names: Optional; the name or names of specific files to retrieve. If provided, only these files' content will be fetched.
:param minutes_ago: Optional; filter for files modified within the specified number of minutes.
:param file_formats: Optional; list of desired file formats to include.
:return: List of dictionaries with file metadata and content in bytes.
"""
if self._are_required_variables_missing():
return None
site_id, drive_id = self._get_site_and_drive_ids(site_domain, site_name)
if not site_id or not drive_id:
return None
files = self._get_files(
site_id, drive_id, folder_path, minutes_ago, file_formats
)
if not files:
logging.info("[sharepoint_files_reader] No files found in the site's drive")
return None
return self._process_files(
site_id, drive_id, folder_path, file_names, files, file_formats
)
def _msgraph_auth(
self,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
authority: Optional[str] = None,
):
"""
Authenticate with Microsoft Graph using MSAL for Python.
"""
# Use provided parameters or fall back to instance attributes
client_id = client_id or self.client_id
client_secret = client_secret or self.client_secret
authority = authority or self.authority
# Check if all necessary credentials are provided
if not all([client_id, client_secret, authority]):
raise ValueError("Missing required authentication credentials.")
app = msal.ConfidentialClientApplication(
client_id=client_id, authority=authority, client_credential=client_secret
)
try:
# Attempt to acquire token
access_token = app.acquire_token_silent(self.scope, account=None)
if not access_token:
access_token = app.acquire_token_for_client(scopes=self.scope)
if "access_token" in access_token:
logging.debug("[sharepoint_files_reader] New access token retrieved.")
else:
logging.error("[sharepoint_files_reader] Error acquiring authorization token.")
return None
else:
logging.debug("[sharepoint_files_reader] Token retrieved from MSAL Cache.")
# Store the access token in the instance
self.access_token = access_token["access_token"]
return self.access_token
except Exception as err:
logging.error(f"[sharepoint_files_reader] Error in msgraph_auth: {err}")
raise
@staticmethod
def _format_url(site_id: str, drive_id: str, folder_path: str = None) -> str:
"""
Formats the URL for accessing a nested site drive in Microsoft Graph.
:param site_id: The site ID in Microsoft Graph.
:param drive_id: The drive ID in Microsoft Graph.
:param folder_path: path to the folder within the drive, can include subfolders.
The format should follow '/folder/subfolder1/subfolder2/'. For example,
'/test/test1/test2/' to access nested folders.
:return: The formatted URL.
"""
# If folder_path is None, empty, or just "/" then return the root folder URL.
if not folder_path or folder_path.strip() == "/":
return f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root/"
# Otherwise, remove any trailing slashes and format the URL for a subfolder.
folder_path_formatted = folder_path.rstrip("/")
return f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root:{folder_path_formatted}:/"
def _make_ms_graph_request(
self, url: str, access_token: Optional[str] = None
) -> Dict:
"""
Make a request to the Microsoft Graph API.
:param url: The URL for the Microsoft Graph API endpoint.
:param access_token: Optional; The access token for Microsoft Graph API authentication. If not provided, uses the instance's stored token.
:return: The JSON response from the Microsoft Graph API.
:raises Exception: If there's an HTTP error or other issues in making the request.
"""
access_token = access_token or self.access_token
if not access_token:
raise ValueError("Access token is required for making API requests.")
headers = {"Authorization": f"Bearer {access_token}"}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as err:
logging.error(f"[sharepoint_files_reader] HTTP Error: {err}")
raise
except Exception as err:
logging.error(f"[sharepoint_files_reader] Error in _make_ms_graph_request: {err}")
raise
def _get_site_id(
self, site_domain: str, site_name: str, access_token: Optional[str] = None
) -> Optional[str]:
"""
Get the Site ID from Microsoft Graph API.
"""
endpoint = (
f"https://graph.microsoft.com/v1.0/sites/{site_domain}:/sites/{site_name}:/"
)
access_token = access_token or self.access_token
try:
logging.debug("[sharepoint_files_reader] Getting the Site ID...")
result = self._make_ms_graph_request(endpoint, access_token)
site_id = result.get("id")
if site_id:
logging.debug(f"[sharepoint_files_reader] Site ID retrieved: {site_id}")
return site_id
except Exception as err:
logging.error(f"[sharepoint_files_reader] Error retrieving Site ID: {err}")
return None
def _get_drive_id(self, site_id: str, access_token: Optional[str] = None) -> str:
"""
Get the drive ID from a Microsoft Graph site.
"""
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drive"
access_token = access_token or self.access_token
try:
json_response = self._make_ms_graph_request(url, access_token)
drive_id = json_response.get("id")
logging.debug(f"[sharepoint_files_reader] Successfully retrieved drive ID: {drive_id}")
return drive_id
except Exception as err:
logging.error(f"[sharepoint_files_reader] Error in get_drive_id: {err}")
raise
def _get_files_in_site(
self,
site_id: str,
drive_id: str,
folder_path: Optional[str] = None,
access_token: Optional[str] = None,
minutes_ago: Optional[int] = None,
file_formats: Optional[List[str]] = None,
) -> List[Dict]:
"""
Get a list of files in a site's drive, optionally filtered by creation or last modification time and file formats.
:param site_id: The site ID in Microsoft Graph.
:param drive_id: The drive ID in Microsoft Graph.
:param folder_path: Path to the folder within the drive, can include subfolders.
The format should follow '/folder/subfolder1/subfolder2/'.For example,
'/test/test1/test2/' to access nested folders.
:param access_token: The access token for Microsoft Graph API authentication. If not provided, it will be fetched from self.
:param minutes_ago: Optional integer to filter files created or updated within the specified number of minutes from now.
:param file_formats: List of desired file formats.
:return: A list of file details.
:raises Exception: If there's an error in fetching file details.
"""
if access_token is None:
access_token = self.access_token
# Construct the URL based on whether a folder path is provided
if folder_path:
url = self._format_url(site_id, drive_id, folder_path) + "children"
else:
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root/children"
try:
logging.info("[sharepoint_files_reader] Making request to Microsoft Graph API")
json_response = self._make_ms_graph_request(url, access_token)
files = json_response["value"]
logging.debug("[sharepoint_files_reader] Received response from Microsoft Graph API")
time_limit = (
datetime.now(timezone.utc) - timedelta(minutes=minutes_ago)
if minutes_ago is not None
else None
)
filtered_files = [
file
for file in files
if (
(
time_limit is None
or datetime.fromisoformat(
file["fileSystemInfo"]["createdDateTime"].rstrip("Z")
).replace(tzinfo=timezone.utc)
>= time_limit
or datetime.fromisoformat(
file["fileSystemInfo"]["lastModifiedDateTime"].rstrip("Z")
).replace(tzinfo=timezone.utc)
>= time_limit
)
and (
not file_formats
or any(file["name"].lower().endswith(f".{fmt.lower()}") for fmt in file_formats)
)
)
]
return filtered_files
except Exception as err:
logging.error(f"[sharepoint_files_reader] Error in get_files_in_site: {err}")
raise
def _get_file_permissions(
self, site_id: str, item_id: str, access_token: Optional[str] = None
) -> List[Dict]:
"""
Get the permissions of a file in a site.
:param site_id: The site ID in Microsoft Graph.
:param item_id: The item ID of the file in Microsoft Graph.
:param access_token: The access token for Microsoft Graph API authentication. If not provided, it will be fetched from self.
:return: A list of permission details.
:raises Exception: If there's an error in fetching permission details.
"""
if access_token is None:
access_token = self.access_token
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drive/items/{item_id}/permissions"
try:
json_response = self._make_ms_graph_request(url, access_token)
return json_response["value"]
except Exception as err:
logging.error(f"[sharepoint_files_reader] Error in get_file_permissions: {err}")
raise
@staticmethod
def _get_read_access_entities(permissions):
"""
Extracts user IDs and group names of entities with read access from the given permissions data.
:param permissions: List of permission dictionaries.
:return: List of entities (user IDs and group names/IDs) with read access.
"""
read_access_entities = []
for permission in permissions:
if not isinstance(permission, dict) or "roles" not in permission:
continue
if any(role in permission.get("roles", []) for role in ["read", "write"]):
# Process grantedToIdentitiesV2 for individual users
identities_v2 = permission.get("grantedToIdentitiesV2", [])
for identity in identities_v2:
user = identity.get("user", {})
user_id = user.get("id")
if user_id and user_id not in read_access_entities:
read_access_entities.append(user_id)
# Process grantedToIdentities for individual users
identities = permission.get("grantedToIdentities", [])
for identity in identities:
user = identity.get("user", {})
user_id = user.get("id")
if user_id and user_id not in read_access_entities:
read_access_entities.append(user_id)
# Process grantedToV2 for groups
groups = permission.get("grantedToV2", {}).get("siteGroup", {})
group_name = groups.get(
"displayName"
) # or groups.get('id') for group ID
if group_name and group_name not in read_access_entities:
read_access_entities.append(group_name)
return read_access_entities
def _get_file_content_bytes(
self,
site_id: str,
drive_id: str,
folder_path: Optional[str],
file_name: str,
access_token: Optional[str] = None,
) -> Optional[bytes]:
"""
Retrieve the content of a file as bytes from a specific site drive.
:param site_id: The site ID in Microsoft Graph.
:param drive_id: The drive ID in Microsoft Graph.
:param folder_path: Path to the folder within the drive, can include subfolders.
:param file_name: The name of the file.
:param access_token: The access token for Microsoft Graph API authentication.
:return: Bytes content of the file or None if there's an error.
"""
if access_token is None:
access_token = self.access_token
folder_path_formatted = folder_path.rstrip("/") if folder_path else ""
endpoint = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root:{folder_path_formatted}/{file_name}:/content"
try:
response = requests.get(
endpoint, headers={"Authorization": "Bearer " + access_token}
)
if response.status_code != 200:
logging.error(
f"[sharepoint_files_reader] Failed to retrieve file content. Status code: {response.status_code}, Response: {response.text}"
)
return None
return response.content
except requests.exceptions.RequestException as req_err:
logging.error(f"[sharepoint_files_reader] Request error: {req_err}")
return None
def _retrieve_file_content(
self, site_id: str, drive_id: str, folder_path: Optional[str], file_name: str
) -> Optional[bytes]:
"""
Retrieve the content of a specific file from SharePoint.
:param site_id: SharePoint site ID.
:param drive_id: SharePoint drive ID.
:param folder_path: Path to the folder containing the file.
:param file_name: Name of the file to retrieve.
:return: Content of the file as bytes, or None if retrieval fails.
"""
return self._get_file_content_bytes(
site_id, drive_id, folder_path, file_name
)
@staticmethod
def _extract_file_metadata(
file_data: Dict[str, Any]
) -> Dict[str, Optional[Union[str, datetime]]]:
"""
Extracts specific information from the file data.
This function takes a dictionary containing file data and returns a new dictionary
with specific fields: 'webUrl', 'size', 'createdBy', 'createdDateTime',
'lastModifiedDateTime', and 'lastModifiedBy'.
Args:
file_data (Dict[str, Any]): The original file data.
Returns:
Dict[str, Optional[Union[str, datetime]]]: A dictionary with the extracted file information.
If a field is not present in the file data, the function will return None for that field.
"""
def format_date(date_str):
# Append 'Z' if it's missing to indicate UTC timezone
return date_str if date_str.endswith("Z") else f"{date_str}Z"
return {
"id": file_data.get("id"),
"webUrl": file_data.get("webUrl"),
"size": file_data.get("size"),
"createdBy": file_data.get("createdBy", {})
.get("user", {})
.get("displayName"),
"createdDateTime": format_date(
file_data.get("fileSystemInfo", {}).get("createdDateTime", "")
)
if file_data.get("fileSystemInfo", {}).get("createdDateTime")
else None,
"lastModifiedDateTime": format_date(
file_data.get("fileSystemInfo", {}).get("lastModifiedDateTime", "")
)
if file_data.get("fileSystemInfo", {}).get("lastModifiedDateTime")
else None,
"lastModifiedBy": file_data.get("lastModifiedBy", {})
.get("user", {})
.get("displayName"),
}
def _are_required_variables_missing(self) -> bool:
"""
Checks if any of the required instance variables for SharePointDataExtractor are missing.
This function checks the following instance variables: 'tenant_id', 'client_id',
'client_secret', 'graph_uri', and 'authority'. If any of these variables are not set,
the function logs an error message and returns True.
:return: True if any of the required instance variables are missing, False otherwise.
"""
required_vars = {
"tenant_id": self.tenant_id,
"client_id": self.client_id,
"client_secret": self.client_secret,
"graph_uri": self.graph_uri,
"authority": self.authority,
}
missing_vars = [var_name for var_name, var in required_vars.items() if not var]
if missing_vars:
logging.error(
f"[sharepoint_files_reader] Required instance variables for SharePointDataExtractor are not set: {', '.join(missing_vars)}. Please load load_environment_variables_from_env_file or set them manually."
)
return True
return False
def _get_site_and_drive_ids(
self, site_domain: str, site_name: str
) -> Tuple[Optional[str], Optional[str]]:
"""
Retrieves the site ID and drive ID for a given site domain and site name.
:param site_domain: The domain of the site.
:param site_name: The name of the site.
:return: A tuple containing the site ID and drive ID, or (None, None) if either ID could not be retrieved.
"""
site_id = self._get_site_id(site_domain, site_name)
if not site_id:
logging.error("[sharepoint_files_reader] Failed to retrieve site_id")
return None, None
drive_id = self._get_drive_id(site_id)
if not drive_id:
logging.error("[sharepoint_files_reader] Failed to retrieve drive ID")
return None, None
return site_id, drive_id
def _get_files(
self,
site_id: str,
drive_id: str,
folder_path: Optional[str],
minutes_ago: Optional[int],
file_formats: Optional[List[str]],
) -> List[Dict]:
"""
Retrieves the files in a site drive.
:param site_id: The site ID in Microsoft Graph.
:param drive_id: The drive ID in Microsoft Graph.
:param folder_path: Optional path to the folder within the drive, can include subfolders.
:param minutes_ago: Optional integer to filter files created or updated within the specified number of minutes from now.
:param file_formats: List of desired file formats.
:return: A list of file details.
"""
files = self._get_files_in_site(
site_id=site_id,
drive_id=drive_id,
folder_path=folder_path,
minutes_ago=minutes_ago,
file_formats=file_formats,
)
return files
def _process_files(
self,
site_id: str,
drive_id: str,
folder_path: Optional[str],
file_names: Optional[Union[str, List[str]]],
files: List[Dict],
file_formats: Optional[List[str]],
) -> List[Dict[str, Any]]:
"""Processes the files in a site drive.
:param site_id: The site ID in Microsoft Graph.
:param drive_id: The drive ID in Microsoft Graph.
:param folder_path: Optional path to the folder within the drive, can include subfolders.
:param file_names: The name(s) of specific files to filter. Can be a string or a list of strings.
:param files: List of files to process.
:param file_formats: List of desired file formats.
:return: A list of dictionaries, each mapping file names to their content and metadata.
"""
file_contents = []
# Handle both string and list for file_names
if isinstance(file_names, str):
file_names = [file_names]
# Filter files based on the given file_names
if file_names:
files = [file for file in files if file.get("name") in file_names]
if len(files) == 0:
logging.error("[sharepoint_files_reader] No matching files found")
return []
for file in files:
file_name = file.get("name")
if file_name and self._is_file_format_valid(file_name, file_formats):
metadata = self._extract_file_metadata(file)
content = self._retrieve_file_content(
site_id, drive_id, folder_path, file_name
)
users_by_role = self._get_read_access_entities(
self._get_file_permissions(site_id, file["id"])
)
file_content = {
"content": content,
**self._format_metadata(metadata, file_name, users_by_role),
}
file_contents.append(file_content)
return file_contents
def _is_file_format_valid(
self, file_name: str, file_formats: Optional[List[str]]
) -> bool:
"""
Checks if the format of a file is valid.
:param file_name: The name of the file.
:param file_formats: List of desired file formats.
:return: True if the file format is valid, False otherwise.
"""
return "." in file_name and (
not file_formats
or any(file_name.lower().endswith(f".{fmt.lower()}") for fmt in file_formats)
)
def _format_metadata(
self,
metadata: Dict,
file_name: str,
users_by_role: Dict,
) -> Dict:
"""
Format and return file metadata.
:param metadata: Dictionary of file metadata.
:param file_name: Name of the file.
:param users_by_role: Dictionary of users grouped by their role.
:return: Formatted metadata as a dictionary.
"""
formatted_metadata = {
"id": metadata["id"],
"source": metadata["webUrl"],
"name": file_name,
"size": metadata["size"],
"created_by": metadata["createdBy"],
"created_datetime": metadata["createdDateTime"],
"last_modified_datetime": metadata["lastModifiedDateTime"],
"last_modified_by": metadata["lastModifiedBy"],
"read_access_entity": users_by_role,
}
return formatted_metadata