connectors/services/job_scheduling.py (210 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""
Event loop
- polls for work by calling Elasticsearch on a regular basis
- instantiates connector plugins
- mirrors an Elasticsearch index with a collection of documents
"""
import functools
from datetime import datetime, timezone
from connectors.es.client import License, with_concurrency_control
from connectors.es.index import DocumentNotFoundError
from connectors.protocol import (
ConnectorIndex,
DataSourceError,
JobTriggerMethod,
JobType,
ServiceTypeNotConfiguredError,
ServiceTypeNotSupportedError,
Status,
SyncJobIndex,
)
from connectors.services.base import BaseService
from connectors.source import get_source_klass
from connectors.utils import ConcurrentTasks
class JobSchedulingService(BaseService):
name = "schedule"
def __init__(self, config):
super().__init__(config, "job_scheduling_service")
self.idling = self.service_config["idling"]
self.heartbeat_interval = self.service_config["heartbeat"]
self.source_list = config["sources"]
self.first_run = True
self.last_wake_up_time = datetime.now(timezone.utc)
self.max_concurrency = self.service_config.get(
"max_concurrent_scheduling_tasks"
)
self.schedule_tasks_pool = ConcurrentTasks(max_concurrency=self.max_concurrency)
def stop(self):
super().stop()
self.schedule_tasks_pool.cancel()
async def _schedule(self, connector):
# To do some first-time stuff
just_started = self.first_run
self.first_run = False
if self.running is False:
connector.log_debug("Skipping run because service is terminating")
return
if connector.native:
connector.log_debug("Natively supported")
try:
await connector.prepare(
self.connectors.get(connector.id, {}), self.config["sources"]
)
except DocumentNotFoundError:
connector.log_error("Couldn't find connector")
return
except ServiceTypeNotConfiguredError:
connector.log_error("Service type is not configured")
return
except ServiceTypeNotSupportedError:
connector.log_debug(f"Can't handle source of type {connector.service_type}")
return
except DataSourceError as e:
await connector.error(e)
connector.log_error(e, exc_info=True)
raise
# the heartbeat is always triggered
await connector.heartbeat(self.heartbeat_interval, force=just_started)
connector.log_debug(f"Status is {connector.status}")
# we trigger a sync
if connector.status == Status.CREATED:
connector.log_info(
"Connector has just been created and cannot sync. Wait for Kibana to initialise connector correctly before proceeding."
)
return
if connector.status == Status.NEEDS_CONFIGURATION:
connector.log_info(
"Connector is not configured yet. Finish connector configuration in Kibana to make it possible to run a sync."
)
return
if connector.service_type not in self.source_list:
msg = f"Couldn't find data source class for {connector.service_type}"
raise DataSourceError(msg)
source_klass = get_source_klass(self.source_list[connector.service_type])
data_source = source_klass(connector.configuration)
data_source.set_logger(connector.logger)
try:
connector.log_debug("Validating configuration")
data_source.validate_config_fields()
await data_source.validate_config()
connector.log_debug("Pinging the backend")
await data_source.ping()
if connector.features.sync_rules_enabled():
await connector.validate_filtering(validator=data_source)
connector.log_debug(
"Connector is configured correctly and can reach the data source"
)
await connector.connected()
except Exception as e:
connector.log_error(e, exc_info=True)
await connector.error(e)
return
finally:
await data_source.close()
if connector.features.document_level_security_enabled():
(
is_platinum_license_enabled,
license_enabled,
) = await self.connector_index.has_active_license_enabled(License.PLATINUM) # pyright: ignore
if is_platinum_license_enabled:
await self._try_schedule_sync(connector, JobType.ACCESS_CONTROL)
else:
connector.log_error(
f"Minimum required Elasticsearch license: '{License.PLATINUM.value}'. Actual license: '{license_enabled.value}'. Skipping access control sync scheduling..."
)
if (
connector.features.incremental_sync_enabled()
and source_klass.incremental_sync_enabled
):
await self._try_schedule_sync(connector, JobType.INCREMENTAL)
await self._try_schedule_sync(connector, JobType.FULL)
async def _run(self):
"""Main event loop."""
self.connector_index = ConnectorIndex(self.es_config)
self.sync_job_index = SyncJobIndex(self.es_config)
native_service_types = self.config.get("native_service_types", []) or []
if len(native_service_types) > 0:
self.logger.debug(
f"Native support for job scheduling for {', '.join(native_service_types)}"
)
else:
self.logger.debug("No native service types configured for job scheduling")
connector_ids = list(self.connectors.keys())
self.logger.info(
f"Job Scheduling Service started, listening to events from {self.es_config['host']}"
)
try:
while self.running:
try:
self.logger.debug(
f"Polling every {self.idling} seconds for Job Scheduling"
)
async for connector in self.connector_index.supported_connectors(
native_service_types=native_service_types,
connector_ids=connector_ids,
):
if not self.schedule_tasks_pool.try_put(
functools.partial(self._schedule, connector)
):
connector.log_debug(
f"Job Scheduling service is already running {self.max_concurrency} concurrent scheduling jobs and can't run more at this point. Increase 'max_concurrent_scheduling_tasks' in config if you want the service to run more concurrent scheduling jobs." # pyright: ignore
)
except Exception as e:
self.logger.error(e, exc_info=True)
self.raise_if_spurious(e)
# Immediately break instead of sleeping
await self.schedule_tasks_pool.join()
if not self.running:
break
self.last_wake_up_time = datetime.now(timezone.utc)
await self._sleeps.sleep(self.idling)
finally:
if self.connector_index is not None:
self.connector_index.stop_waiting()
await self.connector_index.close()
if self.sync_job_index is not None:
self.sync_job_index.stop_waiting()
await self.sync_job_index.close()
return 0
async def _try_schedule_sync(self, connector, job_type):
this_wake_up_time = datetime.now(timezone.utc)
last_wake_up_time = self.last_wake_up_time
self.logger.debug(
f"Scheduler woke up at {this_wake_up_time}. Previously woke up at {last_wake_up_time}."
)
@with_concurrency_control()
async def _should_schedule(job_type):
try:
await connector.reload()
except DocumentNotFoundError:
connector.log_error("Couldn't reload connector")
return False
job_type_value = job_type.value
last_sync_scheduled_at = connector.last_sync_scheduled_at_by_job_type(
job_type
)
self.logger.debug(f"Last sync was scheduled at {last_sync_scheduled_at}")
if (
last_sync_scheduled_at is not None
and last_sync_scheduled_at > last_wake_up_time
):
connector.log_debug(
f"A scheduled '{job_type_value}' sync is created by another connector instance, skipping..."
)
return False
try:
next_sync = connector.next_sync(job_type, last_wake_up_time)
connector.log_debug(f"Next '{job_type_value}' sync is at {next_sync}")
except Exception as e:
connector.log_error(e, exc_info=True)
await connector.error(str(e))
return False
if next_sync is None:
connector.log_debug(f"'{job_type_value}' sync scheduling is disabled")
return False
if this_wake_up_time < next_sync:
next_sync_due = (next_sync - datetime.now(timezone.utc)).total_seconds()
connector.log_debug(
f"Next '{job_type_value}' sync due in {int(next_sync_due)} seconds"
)
return False
await connector.update_last_sync_scheduled_at_by_job_type(
job_type, next_sync
)
return True
if await _should_schedule(job_type):
connector.log_info(f"Creating a scheduled '{job_type.value}' sync...")
await self.sync_job_index.create(
connector=connector,
trigger_method=JobTriggerMethod.SCHEDULED,
job_type=job_type,
)