ees_sharepoint/sync_sharepoint.py (455 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """sync_sharepoint module allows to sync data to Elastic Enterprise Search. It's possible to run full syncs and incremental syncs with this module.""" import os import threading from urllib.parse import urljoin from dateutil.parser import parse from tika.tika import TikaException from . import adapter from .checkpointing import Checkpoint from .usergroup_permissions import Permissions from .utils import encode, extract, split_documents_into_equal_chunks, split_list_into_buckets IDS_PATH = os.path.join(os.path.dirname(__file__), "doc_id.json") SITE = "site" LIST = "list" ITEM = "item" SITES = "sites" LISTS = "lists" LIST_ITEMS = "list_items" DRIVE_ITEMS = "drive_items" def get_results(logger, response, entity_name): """Attempts to fetch results from a Sharepoint Server response :param response: response from the sharepoint client :param entity_name: entity name whether it is SITES, LISTS, LIST_ITEMS OR DRIVE_ITEMS Returns: Parsed response """ if not response: logger.error(f"Empty response when fetching {entity_name}") return None if entity_name == "attachment" and not response.get("d", {}).get("results"): logger.info("Failed to fetch attachment") return None return response.get("d", {}).get("results") class SyncSharepoint: """This class allows syncing objects from the SharePoint Server.""" def __init__( self, config, logger, workplace_search_custom_client, sharepoint_client, start_time, end_time, queue, ): self.config = config self.logger = logger self.workplace_search_custom_client = workplace_search_custom_client self.sharepoint_client = sharepoint_client self.ws_source = config.get_value("workplace_search.source_id") self.objects = config.get_value("objects") self.site_collections = config.get_value("sharepoint.site_collections") self.enable_permission = config.get_value("enable_document_permission") self.start_time = start_time self.end_time = end_time self.sharepoint_thread_count = config.get_value("sharepoint_sync_thread_count") self.mapping_sheet_path = config.get_value("sharepoint_workplace_user_mapping") self.sharepoint_host = config.get_value("sharepoint.host_url") self.checkpoint = Checkpoint(config, logger) self.permissions = Permissions( self.sharepoint_client, self.workplace_search_custom_client, logger ) self.queue = queue def get_schema_fields(self, document_name): """returns the schema of all the include_fields or exclude_fields specified in the configuration file. :param document_name: document name from SITES, LISTS, LIST_ITEMS OR DRIVE_ITEMS Returns: schema: included and excluded fields schema """ fields = self.objects.get(document_name) adapter_schema = adapter.DEFAULT_SCHEMA[document_name] field_id = adapter_schema["id"] if fields: include_fields = fields.get("include_fields") exclude_fields = fields.get("exclude_fields") if include_fields: adapter_schema = { key: val for key, val in adapter_schema.items() if val in include_fields } elif exclude_fields: adapter_schema = { key: val for key, val in adapter_schema.items() if val not in exclude_fields } adapter_schema["id"] = field_id return adapter_schema def fetch_sites(self, parent_site_url, sites, ids, index, start_time, end_time): """This method fetches sites from a collection and invokes the index permission method to get the document level permissions. If the fetching is not successful, it logs proper message. :param parent_site_url: parent site relative path :param sites: dictionary of site path and it's last updated time :param ids: structure containing id's of all objects :param index: index, boolean value :param start_time: start time for fetching the data :param end_time: end time for fetching the data Returns: sites: list of site paths documents: response of sharepoint GET call with fields specified in the schema """ rel_url = f"{parent_site_url}/_api/web/webs" self.logger.info("Fetching the sites detail from url: %s" % (rel_url)) query = self.sharepoint_client.get_query(start_time, end_time, SITES) response = self.sharepoint_client.get(rel_url, query, SITES) document_list = [] response_data = get_results(self.logger, response, SITES) if not response_data: self.logger.info( "No sites were created in %s for this interval: start time: %s and end time: %s" % (parent_site_url, start_time, end_time) ) return sites, [] self.logger.info( "Successfully fetched and parsed %s sites response from SharePoint" % len(response_data) ) schema = self.get_schema_fields(SITES) if index: for i, _ in enumerate(response_data): doc = {"type": SITE} # need to convert date to iso else workplace search throws error on date format Invalid field # value: Value '2021-09-29T08:13:00' cannot be parsed as a date (RFC 3339)"]} response_data[i]["Created"] += "Z" for field, response_field in schema.items(): doc[field] = response_data[i].get(response_field) if self.enable_permission is True: doc["_allow_permissions"] = self.fetch_permissions( key=SITES, site=response_data[i]["ServerRelativeUrl"] ) document_list.append(doc) ids["sites"].update({doc["id"]: response_data[i]["ServerRelativeUrl"]}) for result in response_data: site_server_url = result.get("ServerRelativeUrl") sites.update({site_server_url: result.get("LastItemModifiedDate")}) _, documents = self.fetch_sites(site_server_url, sites, ids, index, start_time, end_time) document_list.extend(documents) return sites, document_list def fetch_lists(self, sites, ids, index): """This method fetches lists from all sites in a collection and invokes the index permission method to get the document level permissions. If the fetching is not successful, it logs proper message. :param sites: dictionary of site path and it's last updated time :param ids: structure containing id's of all objects :param index: index, boolean value Returns: document: response of sharepoint GET call, with fields specified in the schema """ self.logger.info("Fetching lists for all the sites") responses = [] document = [] if not sites: self.logger.info( "No list was created in this interval: start time: %s and end time: %s" % (self.start_time, self.end_time) ) return [], [], {} schema_list = self.get_schema_fields(LISTS) for site_details in sites: for site, time_modified in site_details.items(): if parse(self.start_time) > parse(time_modified): continue rel_url = f"{site}/_api/web/lists" self.logger.info( "Fetching the lists for site: %s from url: %s" % (site, rel_url) ) query = self.sharepoint_client.get_query( self.start_time, self.end_time, LISTS ) response = self.sharepoint_client.get(rel_url, query, LISTS) response_data = get_results(self.logger, response, LISTS) if not response_data: self.logger.info( "No list was created for the site : %s in this interval: start time: %s and end time: %s" % (site, self.start_time, self.end_time) ) continue self.logger.info( "Successfully fetched and parsed %s list response for site: %s from SharePoint" % (len(response_data), site) ) if index: if not ids["lists"].get(site): ids["lists"].update({site: {}}) for i, _ in enumerate(response_data): doc = {"type": LIST} for field, response_field in schema_list.items(): doc[field] = response_data[i].get(response_field) relative_url = response_data[i]["RootFolder"].get('ServerRelativeUrl') if self.enable_permission is True: doc["_allow_permissions"] = self.fetch_permissions( key=LISTS, site=site, list_id=doc["id"], list_url=response_data[i]["ParentWebUrl"], itemid=None, ) doc["url"] = urljoin( self.sharepoint_host, relative_url, ) document.append(doc) ids["lists"][site].update( {doc["id"]: response_data[i]["Title"]} ) responses.append(response_data) lists = {} libraries = {} for response in responses: for result in response: if result.get("BaseType") == 1: libraries[result.get("Id")] = [ result.get("ParentWebUrl"), result.get("Title"), result.get("LastItemModifiedDate"), ] else: lists[result.get("Id")] = [ result.get("ParentWebUrl"), result.get("Title"), result.get("LastItemModifiedDate"), ] documents = {"type": LISTS, "data": document} return lists, libraries, documents def fetch_items(self, lists, ids): """This method fetches items from all the lists in a collection and invokes theindex permission method to get the document level permissions. If the fetching is not successful, it logs proper message. :param lists: document lists :param ids: structure containing id's of all objects Returns: document: response of sharepoint GET call, with fields specified in the schema """ responses = [] # here value is a list of url and title self.logger.info("Fetching all the items for the lists") if not lists: self.logger.info( "No item was created in this interval: start time: %s and end time: %s" % (self.start_time, self.end_time) ) else: for value in lists.values(): if not ids["list_items"].get(value[0]): ids["list_items"].update({value[0]: {}}) schema_item = self.get_schema_fields(LIST_ITEMS) for list_content, value in lists.items(): if parse(self.start_time) > parse(value[2]): continue rel_url = f"{value[0]}/_api/web/lists(guid'{list_content}')/items?$select=*,FileRef" self.logger.info( "Fetching the items for list: %s from url: %s" % (value[1], rel_url) ) query = self.sharepoint_client.get_query( self.start_time, self.end_time, LIST_ITEMS ) response = self.sharepoint_client.get(rel_url, query, LIST_ITEMS) response_data = get_results(self.logger, response, LIST_ITEMS) if not response_data: self.logger.info( "No item was created for the list %s in this interval: start time: %s and end time: %s" % (value[1], self.start_time, self.end_time) ) continue self.logger.info( "Successfully fetched and parsed %s listitem response for list: %s from SharePoint" % (len(response_data), value[1]) ) document = [] if not ids["list_items"][value[0]].get(list_content): ids["list_items"][value[0]].update({list_content: []}) rel_url = f"{value[0]}/_api/web/lists(guid'{list_content}')/items?$select=Attachments,AttachmentFiles,Title&$expand=AttachmentFiles" file_response_data = self.sharepoint_client.get( rel_url, query=query, param_name="attachment" ) if file_response_data: file_response_data = get_results( self.logger, file_response_data.json(), "attachment" ) for i, _ in enumerate(response_data): doc = {"type": ITEM} if response_data[i].get("Attachments") and file_response_data: for data in file_response_data: if response_data[i].get("Title") == data["Title"]: file_relative_url = data["AttachmentFiles"]["results"][ 0 ]["ServerRelativeUrl"] url_s = f"{value[0]}/_api/web/GetFileByServerRelativeUrl('{encode(file_relative_url)}')/$value" response = self.sharepoint_client.get( url_s, query="", param_name="attachment" ) doc["body"] = {} if response and response.ok: try: doc["body"] = extract(response.content) except TikaException as exception: self.logger.error( "Error while extracting the contents from the attachment, Error %s" % (exception) ) break for field, response_field in schema_item.items(): doc[field] = response_data[i].get(response_field) if self.enable_permission is True: doc["_allow_permissions"] = self.fetch_permissions( key=LIST_ITEMS, list_id=list_content, list_url=value[0], itemid=str(response_data[i]["Id"]), ) relative_url = response_data[i].get("FileRef") doc["url"] = urljoin(self.sharepoint_host, relative_url) document.append(doc) if ( response_data[i].get("GUID") not in ids["list_items"][value[0]][list_content] ): ids["list_items"][value[0]][list_content].append( response_data[i].get("GUID") ) responses.extend(document) documents = {"type": LIST_ITEMS, "data": responses} return documents def fetch_drive_items(self, libraries, ids): """This method fetches items from all the lists in a collection and invokes the index permission method to get the document level permissions. If the fetching is not successful, it logs proper message. :param libraries: document lists :param ids: structure containing id's of all objects """ responses = [] # here value is a list of url and title of the library self.logger.info("Fetching all the files for the library") if not libraries: self.logger.info( "No file was created in this interval: start time: %s and end time: %s" % (self.start_time, self.end_time) ) else: schema_drive = self.get_schema_fields(DRIVE_ITEMS) for lib_content, value in libraries.items(): if parse(self.start_time) > parse(value[2]): continue if not ids["drive_items"].get(value[0]): ids["drive_items"].update({value[0]: {}}) rel_url = f"{value[0]}/_api/web/lists(guid'{lib_content}')/items?$select=Modified,Id,GUID,File,Folder&$expand=File,Folder" self.logger.info( "Fetching the items for libraries: %s from url: %s" % (value[1], rel_url) ) query = self.sharepoint_client.get_query( self.start_time, self.end_time, DRIVE_ITEMS ) response = self.sharepoint_client.get(rel_url, query, DRIVE_ITEMS) response_data = get_results(self.logger, response, DRIVE_ITEMS) if not response_data: self.logger.info( "No item was created for the library %s in this interval: start time: %s and end time: %s" % (value[1], self.start_time, self.end_time) ) continue self.logger.info( "Successfully fetched and parsed %s drive item response for library: %s from SharePoint" % (len(response_data), value[1]) ) document = [] if not ids["drive_items"][value[0]].get(lib_content): ids["drive_items"][value[0]].update({lib_content: []}) for i, _ in enumerate(response_data): if response_data[i]["File"].get("TimeLastModified"): obj_type = "File" doc = {"type": "file"} file_relative_url = response_data[i]["File"][ "ServerRelativeUrl" ] url_s = f"{value[0]}/_api/web/GetFileByServerRelativeUrl('{encode(file_relative_url)}')/$value" response = self.sharepoint_client.get( url_s, query="", param_name="attachment" ) doc["body"] = {} if response and response.ok: try: doc["body"] = extract(response.content) except TikaException as exception: self.logger.error( "Error while extracting the contents from the file at %s, Error %s" % (response_data[i].get("Url"), exception) ) else: obj_type = "Folder" doc = {"type": "folder"} for field, response_field in schema_drive.items(): doc[field] = response_data[i][obj_type].get(response_field) doc["id"] = response_data[i].get("GUID") if self.enable_permission is True: doc["_allow_permissions"] = self.fetch_permissions( key=DRIVE_ITEMS, list_id=lib_content, list_url=value[0], itemid=str(response_data[i].get("ID")), ) doc["url"] = urljoin( self.sharepoint_host, response_data[i][obj_type]["ServerRelativeUrl"], ) document.append(doc) if doc["id"] not in ids["drive_items"][value[0]][lib_content]: ids["drive_items"][value[0]][lib_content].append(doc["id"]) responses.extend(document) documents = {"type": DRIVE_ITEMS, "data": responses} return documents def get_roles(self, key, site, list_url, list_id, itemid): """Checks the permissions and returns the user roles. :param key: key, a string value :param site: site name to check the permission :param list_url: list url to access the list :param list_id: list id to check the permission :param itemid: item id to check the permission Returns: roles: user roles """ if key == SITES: rel_url = site roles = self.permissions.fetch_users(key, rel_url) elif key == LISTS: rel_url = list_url roles = self.permissions.fetch_users(key, rel_url, list_id=list_id) else: rel_url = list_url roles = self.permissions.fetch_users( key, rel_url, list_id=list_id, item_id=itemid ) return roles def fetch_permissions( self, key, site=None, list_id=None, list_url=None, itemid=None, ): """This method when invoked, checks the permission inheritance of each object. If the object has unique permissions, the list of users having access to it is fetched using sharepoint api else the permission levels of the that object is taken same as the permission level of the site collection. :param key: key, a string value :param site: site name to index the permission for the site :param list_id: list id to index the permission for the list :param list_url: url of the list :param itemid: item id to index the permission for the item Returns: groups: list of users having access to the given object """ roles = self.get_roles(key, site, list_url, list_id, itemid) groups = [] if not roles: return [] roles = get_results(self.logger, roles.json(), "roles") for role in roles: title = role["Member"]["Title"] groups.append(title) return groups def fetch_and_append_sites_to_queue( self, ids, collection, duration ): """Fetches and appends site details to queue :param ids: id collection of the all the objects :param collection: collection name :param duration: List of time range consisting of the [start_time, end_time] """ start_time, end_time = duration[0], duration[1] parent_site_url = f"/sites/{collection}" sites_path = [] sites, document_list = self.fetch_sites( parent_site_url, {}, ids, (SITES in self.objects), start_time, end_time, ) documents = {"type": SITES, "data": document_list} if documents: self.queue.put(documents) self.logger.debug( f"Thread ID {threading.get_ident()} added list of {len(documents.get('data'))} sites into the queue" ) sites_path.append(sites) return sites_path def fetch_and_append_lists_to_queue(self, ids, sites_path): """Fetches and appends list details to queue :param ids: id collection of the all the objects :param sites_path: dictionary of site path and it's last updated time """ lists_details, libraries_details, documents = self.fetch_lists( sites_path, ids, (LISTS in self.objects) ) if documents: self.queue.put(documents) self.logger.debug( f"Thread ID {threading.get_ident()} added list of {len(documents.get('data'))} lists into the queue" ) return [lists_details, libraries_details] def fetch_and_append_list_items_to_queue(self, ids, lists_details): """Fetches and appends list_items to the queue :param ids: id collection of the all the objects :param lists_details: dictionary containing list name, list path and id """ documents = self.fetch_items(lists_details, ids) if documents: self.queue.put(documents) self.logger.debug( f"Thread ID {threading.get_ident()} added list of {len(documents.get('data'))} list items into the queue" ) def fetch_and_append_drive_items_to_queue(self, ids, libraries_details): """Fetches and appends the drive items to the queue :param ids: id collection of the all the objects :param libraries_details: dictionary containing library name, library path and id """ documents = self.fetch_drive_items(libraries_details, ids) if documents: self.queue.put(documents) self.logger.debug( f"Thread ID {threading.get_ident()} added list of {len(documents.get('data'))} drive items into the queue" ) def fetch_records_from_sharepoint(self, producer, date_ranges, thread_count, ids, collection): """Fetches Sites, Lists, List Items and Drive Items from sharepoint. :param producer: Producer function :param date_ranges: Partition of time range :param thread_count: Thread count :param ids: Content of the local storage :param collection: SharePoint server Collection name """ # Fetch sites time_range_list = [(date_ranges[num], date_ranges[num + 1]) for num in range(0, thread_count)] sites = producer(thread_count, self.fetch_and_append_sites_to_queue, [ids, collection], time_range_list, wait=True) all_sites = [{f"/sites/{collection}": self.end_time}] for site in sites: all_sites.extend(site) # Fetch lists partitioned_sites = split_list_into_buckets(all_sites, thread_count) lists = producer(thread_count, self.fetch_and_append_lists_to_queue, [ids], partitioned_sites, wait=True) # Fetch list items lists_details, libraries_details = {}, {} for result in lists: lists_details.update(result[0]) libraries_details.update(result[1]) if LIST_ITEMS in self.objects: list_items = split_documents_into_equal_chunks(lists_details, thread_count) producer(thread_count, self.fetch_and_append_list_items_to_queue, [ids], list_items, wait=True) # Fetch library details if DRIVE_ITEMS in self.objects: libraries_items = split_documents_into_equal_chunks(libraries_details, thread_count) producer(thread_count, self.fetch_and_append_drive_items_to_queue, [ids], libraries_items, wait=True) return ids