connectors/service_cli.py (164 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # """ Command Line Interface. This is the main entry point of the framework. When the project is installed as a Python package, an `elastic-ingest` executable is added in the PATH and executes the `main` function of this module, which starts the service. """ import asyncio import functools import json import logging import os import signal import click from click import ClickException, UsageError from connectors import __version__ from connectors.build_info import __build_info__ from connectors.config import load_config from connectors.content_extraction import ContentExtraction from connectors.logger import logger, set_logger from connectors.preflight_check import PreflightCheck from connectors.services import get_services from connectors.source import get_source_klass, get_source_klasses __all__ = ["main"] from connectors.utils import sleeps_for_retryable async def _start_service(actions, config, loop): """Starts the service. Steps: - performs a preflight check using `PreflightCheck` - instantiates a `MultiService` instance and runs its `run` async function """ preflight = PreflightCheck(config, __version__) for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, functools.partial(preflight.shutdown, sig)) try: success, is_serverless = await preflight.run() if not success: return -1 # injecting this value into the config allows us to avoid checking the server again before requests config["elasticsearch"]["serverless"] = is_serverless finally: for sig in (signal.SIGINT, signal.SIGTERM): loop.remove_signal_handler(sig) multi_service = get_services(actions, config) def _shutdown(signal_name): sleeps_for_retryable.cancel(signal_name) multi_service.shutdown(signal_name) for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, functools.partial(_shutdown, sig.name)) if "PERF8" in os.environ: import perf8 async with perf8.measure(): return await multi_service.run() else: return await multi_service.run() def _get_uvloop(): import uvloop return uvloop def get_event_loop(uvloop=False): if uvloop: # activate uvloop if lib is present try: import uvloop asyncio.set_event_loop_policy(_get_uvloop().EventLoopPolicy()) except Exception as e: logger.warning( f"Unable to enable uvloop: {e}. Running with default event loop" ) pass try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.get_event_loop_policy().get_event_loop() if loop is None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop def run(action, config_file, log_level, filebeat, service_type, uvloop): """Loads the config file, sets the logger and executes an action. Actions: - list: prints out a list of all connectors and exits - poll: starts the event loop and run forever (default) """ logger.info(f"Running connector service version {__version__}") # load config config = {} try: config = load_config(config_file) ContentExtraction.set_extraction_config( config.get("extraction_service", None) ) # Not perfect, let's revisit except Exception as e: # If something goes wrong while parsing config file, we still want # to set up the logger so that Cloud deployments report errors to # logs properly set_logger(logging.INFO, filebeat=filebeat) msg = f"Could not parse {config_file}. Check logs for more information" logger.exception(f"{msg}.\n{e}") raise ClickException(msg) from e # Precedence: CLI args >> Config Setting >> INFO set_logger( log_level or config["service"]["log_level"] or logging.INFO, filebeat=filebeat, ) # just display the list of connectors if action == ("list",): print("Registered connectors:") # noqa: T201 for source in get_source_klasses(config): print(f"- {source.name}") # noqa: T201 print("Bye") # noqa: T201 return 0 if action == ("config",): print( # noqa: T201 f"Getting default configuration for service type {service_type}" ) source_list = config["sources"] if service_type not in source_list: msg = f"Could not find a connector for service type {service_type}" raise UsageError(msg) source_klass = get_source_klass(source_list[service_type]) print( # noqa: T201 json.dumps(source_klass.get_simple_configuration(), indent=2) ) print("Bye") # noqa: T201 return 0 if "list" in action: msg = "Cannot use the `list` action with other actions" raise UsageError(msg) if "config" in action: msg = "Cannot use the `config` action with other actions" raise UsageError(msg) loop = get_event_loop(uvloop) coro = _start_service(action, config, loop) try: return loop.run_until_complete(coro) except asyncio.CancelledError: return 0 finally: logger.info("Bye") @click.command() @click.version_option(__build_info__, "-v", "--version", message="%(version)s") @click.option( "--action", type=click.Choice( [ "schedule", "sync_content", "sync_access_control", "list", "config", "cleanup", ], case_sensitive=False, ), multiple=True, default=["schedule", "sync_content", "sync_access_control", "cleanup"], help="What elastic-ingest should do.", ) @click.option( "-c", "--config-file", type=click.Path(), default=os.path.join(os.path.dirname(__file__), "..", "config.yml"), show_default=True, help="Configuration file.", ) @click.option( "--log-level", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]), help="Set log level for the service.", ) @click.option( "--debug", "log_level", flag_value="DEBUG", help="Run the event loop in debug mode (alias for --log-level DEBUG).", ) @click.option( "--filebeat", is_flag=True, default=False, help="Output in filebeat format." ) @click.option( "--service-type", type=str, default=None, help="Service type to get default configuration for if action is config.", ) @click.option("--uvloop", is_flag=True, default=False, help="Use uvloop if possible.") def main(action, config_file, log_level, filebeat, service_type, uvloop): """Entry point to the service, responsible for all operations. Parses the arguments and calls `run` with them. """ return run(action, config_file, log_level, filebeat, service_type, uvloop)