ees_network_drive/sync_network_drives.py (66 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""This module allows to sync data to Elastic Enterprise Search.
It's possible to run full syncs and incremental syncs with this module.
"""
import copy
import os
import threading
from pathlib import Path
from .files import Files
class SyncNetworkDrives:
"""This class contains common logic for fetching from Network Drives"""
def __init__(
self,
logger,
config,
time_range,
network_drive_client,
indexing_rules,
queue,
):
self.logger = logger
self.config = config
self.time_range = time_range
self.drive_path = Path(self.config.get_value("network_drive.path"))
self.network_drive_client = network_drive_client
self.indexing_rules = indexing_rules
self.network_drives_sync_thread_count = config.get_value("network_drives_sync_thread_count")
self.queue = queue
def get_storage_with_collection(self, local_storage):
"""Returns a dictionary containing the locally stored IDs of files fetched from network drives
:param local_storage: The object of the local storage used to store the indexed document IDs
"""
storage_with_collection = {"global_keys": {}, "delete_keys": {}}
ids_collection = local_storage.load_storage()
storage_with_collection["delete_keys"] = copy.deepcopy(ids_collection.get("global_keys"))
if not ids_collection["global_keys"]:
ids_collection["global_keys"] = {"files": {}}
storage_with_collection["global_keys"] = copy.deepcopy(ids_collection["global_keys"])
return storage_with_collection
def connect_and_get_all_folders(self):
"""Connects to the Network drive and returns the list of all the folders present on the Network drive"""
smb_connection = self.network_drive_client.connect()
if not smb_connection:
raise Exception("Unkown error while connecting to the Network Drives")
store = []
files = Files(self.logger, self.config, self.network_drive_client)
store = files.recursive_fetch(
smb_connection=smb_connection,
service_name=self.drive_path.parts[0],
path=os.path.join(*self.drive_path.parts[1:]),
store=[],
)
smb_connection.close()
return store
def perform_sync(self, drive, partition_paths):
"""This method fetches all the objects from Network Drives server and
appends them to the shared queue
:param drive: The Network Drive name
:param indexing_rules: Object of the indexing rules
Returns:
storage: dictionary containing the ids and path of all the files in Network Drives
"""
if not partition_paths:
return {}
files = Files(self.logger, self.config, self.network_drive_client)
documents_to_index = []
self.logger.info(f"Thread: [{threading.get_ident()}] fetching all the files for folder {partition_paths}")
ids_storage = {}
try:
fetched_documents = files.fetch_files(
self.drive_path.parts[0],
partition_paths,
self.time_range,
self.indexing_rules,
)
self.queue.append_to_queue(fetched_documents)
documents_to_index.extend(fetched_documents)
except Exception as exception:
self.logger.error(f"Error while fetching files for the path: {partition_paths}. Error: {exception}")
for doc in documents_to_index:
ids_storage.update({doc["id"]: doc["path"]})
return ids_storage