ees_sharepoint/base_command.py (50 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""Module contains a base command interface.
Connector can run multiple commands such as full-sync, incremental-sync,
etc. This module provides convenience interface defining the shared
objects and methods that will can be used by commands."""
import logging
# For Python>=3.8 cached_property should be imported from functools,
# and for the prior versions it should be imported from cached_property
try:
from functools import cached_property
except ImportError:
from cached_property import cached_property
from concurrent.futures import ThreadPoolExecutor, as_completed
from .configuration import Configuration
from .enterprise_search_wrapper import EnterpriseSearchWrapper
from .local_storage import LocalStorage
from .sharepoint_client import SharePoint
class BaseCommand:
"""Base interface for all module commands.
Inherit from it and implement 'execute' method, then add
code to cli.py to register this command."""
def __init__(self, args):
self.args = args
def execute(self):
"""Run the command.
This method is overridden by actual commands with logic
that is specific to each command implementing it."""
raise NotImplementedError
@cached_property
def logger(self):
"""Get the logger instance for the running command.
log level will be determined by the configuration
setting log_level.
"""
log_level = self.config.get_value("log_level")
logger = logging.getLogger(__name__)
logger.propagate = False
logger.setLevel(log_level)
handler = logging.StreamHandler()
# Uncomment the following lines to output logs in ECS-compatible format
# formatter = ecs_logging.StdlibFormatter()
# handler.setFormatter(formatter)
handler.setLevel(log_level)
logger.addHandler(handler)
return logger
@cached_property
def workplace_search_custom_client(self):
"""Get the workplace search custom client instance for the running command."""
return EnterpriseSearchWrapper(self.logger, self.config, self.args)
@cached_property
def config(self):
"""Get the configuration for the connector for the running command."""
file_name = self.args.config_file
return Configuration(file_name)
@cached_property
def sharepoint_client(self):
"""Get the sharepoint client instance for the running command."""
return SharePoint(self.config, self.logger)
@staticmethod
def producer(thread_count, func, args, items, wait=False):
"""Apply async calls using multithreading to the targeted function
:param thread_count: Total number of threads to be spawned
:param func: The target function on which the async calls would be made
:param args: Arguments for the targeted function
:param items: iterator of partition
:param wait: wait until job completes if true, otherwise returns immediately
"""
with ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = (executor.submit(func, *args, item) for item in items)
if wait:
result = [future.result() for future in as_completed(futures)]
return result
@staticmethod
def consumer(thread_count, func):
"""Apply async calls using multithreading to the targeted function
:param thread_count: Total number of threads to be spawned
:param func: The target function on which the async calls would be made
"""
with ThreadPoolExecutor(max_workers=thread_count) as executor:
for _ in range(thread_count):
executor.submit(func)
@cached_property
def local_storage(self):
"""Get the object for local storage to fetch and update ids stored locally"""
return LocalStorage(self.logger)