ees_network_drive/deletion_sync_command.py (63 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""This module allows to remove recently deleted documents from Elastic Enterprise Search.
Documents that were deleted in Network Drives will still be available in
Elastic Enterprise Search until a full sync happens, or until this module is used.
"""
import os
from . import constant
from .base_command import BaseCommand
from .files import Files
from .utils import (group_files_by_folder_path,
split_documents_into_equal_chunks)
class DeletionSyncCommand(BaseCommand):
"""DeletionSyncCommand class allows to remove instances of specific files.
It provides a way to remove those files from Elastic Enterprise Search
that were deleted in Network Drives Server instance."""
def __init__(self, args):
super().__init__(args)
self.logger.debug("Initializing the deletion sync class")
self.server_name = self.config.get_value("network_drive.server_name")
def get_deleted_files(self, drive_name, ids):
"""Fetches the ids of deleted files from the Network Drives
:param drive_name: service name of the Network Drives
:param ids: structure containing id's of all files
Returns:
ids_list: list of file ids that got deleted from Network Drives
"""
file_details = ids["delete_keys"].get("files")
file_structure = group_files_by_folder_path(file_details)
ids_list = []
if not file_details:
self.logger.info(f"No files found to be deleted for drive: {drive_name}")
return []
deleted_folders = []
visited_folders = []
smb_connection = self.network_drive_client.connect()
if not smb_connection:
raise ConnectionError("Unknown error while connecting to network drives")
files = Files(self.logger, self.config, self.network_drive_client)
for file_id, file_path in file_details.items():
folder_path, file_name = os.path.split(file_path)
if folder_path in deleted_folders:
ids_list.append(file_id)
continue
if folder_path in visited_folders:
continue
folder_deleted = files.is_file_present_on_network_drive(
smb_connection,
drive_name,
folder_path,
file_structure,
ids_list,
visited_folders,
deleted_folders,
)
if folder_deleted:
ids_list.append(file_id)
return ids_list
def sync_deleted_files(self, ids_list, ids):
"""Invokes delete documents api for the deleted files ids to remove them from
workplace search.
:param ids_list: list of ids of files to be deleted from Enterprise Search
:param ids: structure containing ids of all files
Returns:
ids: updated structure containing ids of all files after performing deletion
"""
if ids_list:
for chunk in split_documents_into_equal_chunks(ids_list, constant.BATCH_SIZE):
self.workplace_search_custom_client.delete_documents(chunk)
for id in ids_list:
ids["global_keys"]["files"].pop(id)
return ids
def execute(self):
"""Runs the deletion sync logic"""
self.logger.info("Starting the deletion sync..")
ids = self.local_storage.load_storage()
self.logger.info(f"Starting the deletion sync for drive: {self.server_name}")
if ids["delete_keys"].get("files"):
deleted_ids = self.get_deleted_files(self.server_name, ids)
ids = self.sync_deleted_files(deleted_ids, ids)
self.logger.info("Completed the syncing of deleted files")
else:
self.logger.debug(f"No objects present to be deleted for the drive: {self.server_name}")
ids["delete_keys"] = {}
self.logger.info("Updating the local storage")
self.local_storage.update_storage(ids)