ees_network_drive/files.py (165 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """Module responsible for fetching the files from the Network Drives and returning a document with all file details in json format. """ import os from pathlib import Path import errno import tempfile import time from dateutil.parser import parse from tika.tika import TikaException from . import adapter, constant from .utils import extract, fetch_users_from_csv_file, hash_id ACCESS_ALLOWED_TYPE = 0 ACCESS_DENIED_TYPE = 1 ACCESS_MASK_DENIED_WRITE_PERMISSION = 278 ACCESS_MASK_ALLOWED_WRITE_PERMISSION = 1048854 STATUS_NO_SUCH_FILE = 3221225487 STATUS_NO_SUCH_DEVICE = 3221225486 STATUS_OBJECT_NAME_NOT_FOUND = 3221225524 STATUS_OBJECT_PATH_NOT_FOUND = 3221225530 class Files: """This class fetches objects from Network Drives """ def __init__(self, logger, config, client): self.logger = logger self.user_mapping = config.get_value("network_drive_enterprise_search.user_mapping") self.drive_path = config.get_value("network_drive.path") self.server_ip = config.get_value("network_drive.server_ip") self.enable_document_permission = config.get_value("enable_document_permission") self.network_drives_client = client def is_file_present_on_network_drive(self, smb_connection, drive_name, folder_path, file_structure, ids_list, visited_folders, deleted_folders): """Checks that folder/file present in Network Drives or not :param smb_connection: connection object :param drive_name: service name of the Network Drives :param folder_path: the relative path of the folder :param file_structure: dictionary containing folder and list of files inside the folder :param ids_list: list of id's of deleted files :param visited_folders: list of visited path of folders :param deleted_folders: list of deleted path of folders Returns: folder_deleted: boolean value indicating folder is deleted or not """ folder_deleted = False try: drive_path = Path(self.drive_path) available_files = smb_connection.listPath(drive_path.parts[0], folder_path) for file in available_files: if file_structure[folder_path].get(file.filename): file_structure[folder_path].pop(file.filename) ids_list.extend(list(file_structure[folder_path].values())) visited_folders.append(folder_path) except Exception as exception: status = exception.smb_messages[-1].status if status in [STATUS_NO_SUCH_FILE, STATUS_NO_SUCH_DEVICE, STATUS_OBJECT_NAME_NOT_FOUND]: for folder in file_structure.keys(): if folder_path in folder: deleted_folders.append(folder) self.logger.info(f"{folder} entire folder is deleted.") deleted_folders.append(folder_path) return True elif status == STATUS_OBJECT_PATH_NOT_FOUND: folder_path, _ = os.path.split(folder_path) folder_deleted = self.is_file_present_on_network_drive(smb_connection, drive_name, folder_path, file_structure, ids_list, visited_folders, deleted_folders) else: self.logger.exception(f"Error while retrieving files from drive {drive_name}.Error: {exception}") return folder_deleted def recursive_fetch(self, smb_connection, service_name, path, store): """This method is used to recursively fetch folder paths from Network Drives. :param smb_connection: SMB connection object :param service_name: name of the drive :param path: Path of the Network Drives :param store: temporary storage for fetched ids from Drive :returns: list of all the folder paths in network drives """ try: file_list = smb_connection.listPath(service_name, rf'{path}', search=16) except Exception as exception: self.logger.exception(f"Unknown error while fetching files {exception}") return store for file in file_list: if file.filename not in ['.', '..']: file_name = file.filename self.recursive_fetch(smb_connection, service_name, os.path.join(path, file_name), store) store.append(path) return store def extract_files(self, smb_connection, service_name, path, time_range, indexing_rules): """ :param smb_connection: SMB connection object :param service_name: name of the drive :param path: Path of the Network Drives :param time_range: Start and End Time :param indexing_rules: object of indexing_rules :returns: dictionary of ids and file details for the files fetched """ storage = {} try: file_list = smb_connection.listPath(service_name, rf'{path}') except Exception as exception: self.logger.exception(f"Unknown error while extracting files from folder {path}.Error {exception}") return storage for file in file_list: if not file.isDirectory: file_name = file.filename updated_at = \ time.strftime(constant.RFC_3339_DATETIME_FORMAT, time.gmtime(file.last_attr_change_time)) created_at = \ time.strftime(constant.RFC_3339_DATETIME_FORMAT, time.gmtime(file.create_time)) file_path = os.path.join(path, file_name) file_details = { 'updated_at': updated_at, 'file_type': os.path.splitext(file_name)[1], 'file_size': file.file_size, 'created_at': created_at, 'file_name': file_name, 'file_path': file_path, 'web_path': f"file://{self.server_ip}/{service_name}/{file_path}" } is_indexable = indexing_rules.should_index(file_details) if is_indexable \ and parse(time_range.get('start_time')) < parse(updated_at) \ and parse(updated_at) <= parse(time_range.get('end_time')): file_id = file.file_id if file.file_id else hash_id(file_name, path) storage.update({file_id: file_details}) return storage def retrieve_permission(self, smb_connection, service_name, file_path): """This method is used to retrieve permission from Network Drives. :param smb_connection: SMB connection object :param service_name: name of the drive :param file_path: Path of the Network Drives :returns: hash of allow and deny permissions lists """ try: security_info = smb_connection.getSecurity(service_name, rf'{file_path}') except Exception as exception: self.logger.exception(f"Unknown error while fetching permission details for file {file_path}.\ Error {exception}") return [], [] allow_users = [] deny_users = [] if security_info.dacl: aces = security_info.dacl.aces for ace in (aces or []): sid = str(ace.sid) if ace.type == ACCESS_ALLOWED_TYPE or ace.mask == ACCESS_MASK_DENIED_WRITE_PERMISSION: allow_users.append(sid) if (ace.type == ACCESS_DENIED_TYPE and ace.mask != ACCESS_MASK_DENIED_WRITE_PERMISSION) or \ ace.mask == ACCESS_MASK_ALLOWED_WRITE_PERMISSION: deny_users.append(sid) if not fetch_users_from_csv_file(self.user_mapping, self.logger).get(sid): self.logger.warning(f"No mapping found for sid:{sid} in csv file. \ Please add the sid->user mapping for the {sid} and rerun the \ permission_sync_command to sync the user mappings.") return {'allow': allow_users, 'deny': deny_users} def fetch_files(self, service_name, path_list, time_range, indexing_rules): """This method is used to fetch and index files to Workplace Search :param service_name: name of the drive :param path_list: list of folder paths inside the network drives :param time_range: Start and End Time :param indexing_rules: object of indexing_rules """ schema = adapter.FILES documents = [] smb_connection = self.network_drives_client.connect() if smb_connection: for folder_path in path_list: storage = self.extract_files(smb_connection, service_name, folder_path, time_range, indexing_rules) for file_id, file_details in storage.items(): doc = {} for field, file_field in schema.items(): doc[field] = file_details.get(file_field) doc.update({'body': {}, 'id': str(file_id)}) if self.enable_document_permission: permissions = self.retrieve_permission( smb_connection, service_name, file_details.get("file_path")) doc['_allow_permissions'] = permissions['allow'] doc['_deny_permissions'] = permissions['deny'] doc['body'] = self.fetch_file_content(service_name, file_details, smb_connection) documents.append(doc) smb_connection.close() else: raise ConnectionError("Unknown error while connecting to network drives") return documents def fetch_file_content(self, service_name, file_details, smb_connection): """This method is used to fetch content from Network Drives file :param service_name: name of the drive :param file_details: dictionary containing file details :param smb_connection: connection object """ file_obj = tempfile.NamedTemporaryFile() try: smb_connection.retrieveFile(service_name, file_details.get('file_path'), file_obj) file_obj.seek(0) try: extracted_content = extract(file_obj.read()) file_obj.close() return extracted_content except TikaException as exception: self.logger.exception( f"Error while extracting contents from file {file_details.get('file_name')} via Tika Parser. \ Error {exception}") except Exception as exception: if isinstance(exception, OSError) and exception.errno == errno.ENOSPC: self.logger.exception( f"We reached the memory limit for extracting the file: {file_details.get('file_name')}. \ Skipping the contents of this file. Error: {exception}" ) else: self.logger.exception( f"Cannot read the contents of the file {file_details.get('file_name')} . Error {exception}") file_obj.close()