ees_network_drive/incremental_sync_command.py (56 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""This module allows to run an incremental sync against a Network Drives Server instance.
It will attempt to sync documents that have changed or have been added in the
third-party system recently and ingest them into Enterprise Search instance.
Recency is determined by the time when the last successful incremental or full job
was ran.
"""
from .base_command import BaseCommand
from .checkpointing import Checkpoint
from .connector_queue import ConnectorQueue
from .local_storage import LocalStorage
from .sync_enterprise_search import SyncEnterpriseSearch
from .sync_network_drives import SyncNetworkDrives
from .utils import get_current_time, split_list_into_buckets
INDEXING_TYPE = "incremental"
class IncrementalSyncCommand(BaseCommand):
"""This class start executions of incrementalsync feature."""
def start_producer(self, queue, time_range):
"""This method starts async calls for the producer which is responsible
for fetching documents from the Network Drive and pushing them in the shared queue
:param queue: Shared queue to store the fetched documents
:param time_range: Time range dictionary storing start time and end time
"""
logger = self.logger
sync_network_drives = SyncNetworkDrives(
logger,
self.config,
time_range,
self.network_drive_client,
self.indexing_rules,
queue,
)
thread_count = self.config.get_value("network_drives_sync_thread_count")
drive = self.config.get_value("network_drive.server_name")
try:
local_storage = LocalStorage(logger)
storage_with_collection = sync_network_drives.get_storage_with_collection(local_storage)
store = sync_network_drives.connect_and_get_all_folders()
partition_paths = split_list_into_buckets(store, thread_count)
global_keys = self.create_jobs(thread_count, sync_network_drives.perform_sync, (drive,), partition_paths)
try:
storage_with_collection["global_keys"]["files"].update(global_keys)
except ValueError as value_error:
logger.error(f"Exception while updating storage: {value_error}")
# Send end signals for each live threads to notify them to close watching the queue
# for any incoming documents
for _ in range(self.config.get_value("enterprise_search_sync_thread_count")):
queue.end_signal()
except Exception as exception:
logger.error("Error while Fetching from the Network drive. Checkpoint not saved")
raise exception
local_storage.update_storage(storage_with_collection)
def start_consumer(self, queue):
"""This method starts async calls for the consumer which is responsible for indexing documents to the Enterprise Search
:param queue: Shared queue to fetch the stored documents
"""
logger = self.logger
thread_count = self.config.get_value("enterprise_search_sync_thread_count")
sync_es = SyncEnterpriseSearch(self.config, logger, self.workplace_search_custom_client, queue)
self.create_jobs(thread_count, sync_es.perform_sync, (), None)
def execute(self):
"""This function execute the incremental sync."""
config = self.config
logger = self.logger
current_time = get_current_time()
checkpoint = Checkpoint(config, logger)
drive = config.get_value("network_drive.server_name")
start_time, end_time = checkpoint.get_checkpoint(current_time, drive)
time_range = {"start_time": start_time, "end_time": end_time}
logger.info(f"Indexing started at: {current_time}")
queue = ConnectorQueue(logger)
self.start_producer(queue, time_range)
self.start_consumer(queue)
checkpoint.set_checkpoint(current_time, INDEXING_TYPE, drive)
logger.info(f"Indexing ended at: {get_current_time()}")