lib/app/dispatcher.rb (132 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'active_support'
require 'concurrent'
require 'connectors'
require 'core'
require 'utility'
require 'app/config'
module App
class Dispatcher
POLL_INTERVAL = (App::Config.poll_interval || 3).to_i
TERMINATION_TIMEOUT = (App::Config.termination_timeout || 60).to_i
HEARTBEAT_INTERVAL = (App::Config.heartbeat_interval || 60 * 30).to_i
MIN_THREADS = (App::Config.dig(:thread_pool, :min_threads) || 0).to_i
MAX_THREADS = (App::Config.dig(:thread_pool, :max_threads) || 5).to_i
MAX_QUEUE = (App::Config.dig(:thread_pool, :max_queue) || 100).to_i
JOB_CLEANUP_INTERVAL = (App::Config.job_cleanup_interval || 60 * 5).to_i
@running = Concurrent::AtomicBoolean.new(false)
class << self
def start!
running!
Utility::Logger.info("Starting connector service in #{App::Config.native_mode ? 'native' : 'non-native'} mode...")
start_job_cleanup_task!
# start sync jobs consumer
start_consumer!
start_polling_jobs!
end
def shutdown!
Utility::Logger.info("Shutting down connector service with pool [#{pool.class}]...")
running.make_false
job_cleanup_timer.shutdown
scheduler.shutdown
pool.shutdown
pool.wait_for_termination(TERMINATION_TIMEOUT)
stop_consumer!
end
private
attr_reader :running
def running!
raise 'connector service is already running!' unless running.make_true
end
def pool
@pool ||= Concurrent::ThreadPoolExecutor.new(
min_threads: MIN_THREADS,
max_threads: MAX_THREADS,
max_queue: MAX_QUEUE,
fallback_policy: :abort
)
end
def scheduler
@scheduler ||= if App::Config.native_mode
Core::NativeScheduler.new(POLL_INTERVAL, HEARTBEAT_INTERVAL)
else
Core::SingleScheduler.new(App::Config.connector_id, POLL_INTERVAL, HEARTBEAT_INTERVAL)
end
end
def job_cleanup_timer
@job_cleanup_timer ||= Concurrent::TimerTask.new(:execution_interval => JOB_CLEANUP_INTERVAL, :run_now => true) do
connector_id = App::Config.native_mode ? nil : App::Config.connector_id
Core::JobCleanUp.execute(connector_id)
end
end
def start_job_cleanup_task!
job_cleanup_timer.execute
end
def start_polling_jobs!
scheduler.when_triggered do |connector_settings, task|
case task
when :sync
# TODO: #update_connector_sync_now should be moved to Core::ConnectorSettings,
# there should not be any business logic related code in Core::ElasticConnectorActions.
# #update_connector_sync_now should not update `last_synced` after https://github.com/elastic/enterprise-search-team/issues/3366 is resolved,
# schedule should not based on `last_synced`
Core::ElasticConnectorActions.update_connector_sync_now(connector_settings.id, false)
Core::Jobs::Producer.enqueue_job(job_type: :sync, connector_settings: connector_settings)
when :heartbeat
start_heartbeat_task(connector_settings)
when :configuration
start_configuration_task(connector_settings)
when :filter_validation
start_filter_validation_task(connector_settings)
else
Utility::Logger.error("Unknown task type: #{task}. Skipping...")
end
end
rescue StandardError => e
Utility::ExceptionTracking.log_exception(e, 'The connector service failed due to unexpected error.')
end
def start_heartbeat_task(connector_settings)
pool.post do
Utility::Logger.info("Sending heartbeat for #{connector_settings.formatted}...")
Core::Heartbeat.send(connector_settings)
rescue StandardError => e
Utility::ExceptionTracking.log_exception(e, "Heartbeat task for #{connector_settings.formatted} failed due to unexpected error.")
end
end
def start_configuration_task(connector_settings)
pool.post do
Utility::Logger.info("Updating configuration for #{connector_settings.formatted}...")
# when in non-native mode, populate the service type if it's not in connector settings
service_type = if !App::Config.native_mode && connector_settings.needs_service_type?
App::Config.service_type
else
nil
end
Core::Configuration.update(connector_settings, service_type)
rescue StandardError => e
Utility::ExceptionTracking.log_exception(e, "Configuration task for #{connector_settings.formatted} failed due to unexpected error.")
end
end
def start_filter_validation_task(connector_settings)
pool.post do
Utility::Logger.info("Validating filters for #{connector_settings.formatted}...")
validation_job_runner = Core::Filtering::ValidationJobRunner.new(connector_settings)
validation_job_runner.execute
rescue StandardError => e
Utility::ExceptionTracking.log_exception(e, "Filter validation task for #{connector_settings.formatted} failed due to unexpected error.")
end
end
def start_consumer!
@consumer = Core::Jobs::Consumer.new(
poll_interval: POLL_INTERVAL,
termination_timeout: TERMINATION_TIMEOUT,
min_threads: MIN_THREADS,
max_threads: MAX_THREADS,
max_queue: MAX_QUEUE,
max_ingestion_queue_size: (App::Config.max_ingestion_queue_size || Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_SIZE).to_i,
max_ingestion_queue_bytes: (App::Config.max_ingestion_queue_bytes || Utility::Constants::DEFAULT_MAX_INGESTION_QUEUE_BYTES).to_i,
scheduler: scheduler
)
@consumer.subscribe!(index_name: Utility::Constants::JOB_INDEX)
end
def stop_consumer!
return if @consumer.nil?
return unless @consumer.running?
@consumer.shutdown!
end
end
end
end