lib/core/jobs/consumer.rb (101 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'utility/logger'
require 'utility/constants'
require 'core/connector_job'
require 'core/sync_job_runner'
require 'concurrent'
module Core
module Jobs
class Consumer
def initialize(scheduler:,
max_ingestion_queue_size:,
max_ingestion_queue_bytes:,
poll_interval: 3,
termination_timeout: 60,
min_threads: 1,
max_threads: 5,
max_queue: 100,
idle_time: 5)
@scheduler = scheduler
@poll_interval = poll_interval
@termination_timeout = termination_timeout
@min_threads = min_threads
@max_threads = max_threads
@max_queue = max_queue
@idle_time = idle_time
@max_ingestion_queue_size = max_ingestion_queue_size
@max_ingestion_queue_bytes = max_ingestion_queue_bytes
end
def subscribe!(index_name:)
Utility::Logger.info("Starting a new consumer for #{index_name} index")
@index_name = index_name
start_timer_task!
start_thread_pool!
end
def running?
pool&.running? && timer_task&.running?
end
def shutdown!
Utility::Logger.info("Shutting down consumer for #{@index_name} index")
timer_task.shutdown
pool.shutdown
pool.wait_for_termination(@termination_timeout)
reset_pool!
end
private
attr_reader :pool, :timer_task
def start_timer_task!
@timer_task = Concurrent::TimerTask.execute(execution_interval: @poll_interval, run_now: true) { execute }
end
def start_thread_pool!
@pool = Concurrent::ThreadPoolExecutor.new(
min_threads: @min_threads,
max_threads: @max_threads,
max_queue: @max_queue,
fallback_policy: :abort,
idletime: @idle_time
)
end
def reset_pool!
@pool = nil
end
def execute
Utility::Logger.debug('Getting registered connectors')
connectors = ready_for_sync_connectors
return unless connectors.any?
Utility::Logger.debug("Number of available connectors: #{connectors.size}")
# @TODO It is assumed that @index_name is used to retrive pending jobs.
# This will be discussed after 8.6 release
pending_jobs = Core::ConnectorJob.pending_jobs(connectors_ids: connectors.keys)
Utility::Logger.info("Number of pending jobs: #{pending_jobs.size}")
pending_jobs.each do |job|
connector_settings = connectors[job.connector_id]
execute_job(job, connector_settings)
end
rescue StandardError => e
Utility::ExceptionTracking.log_exception(e, 'The consumer group failed')
end
def execute_job(job, connector_settings)
pool.post do
Utility::Logger.info("Connector #{connector_settings.formatted} picked up the job #{job.id}")
Core::ElasticConnectorActions.ensure_content_index_exists(connector_settings.index_name)
job_runner = Core::SyncJobRunner.new(
connector_settings,
job,
@max_ingestion_queue_size,
@max_ingestion_queue_bytes
)
job_runner.execute
rescue Core::JobAlreadyRunningError
Utility::Logger.info("Sync job for #{connector_settings.formatted} is already running, skipping.")
rescue Core::ConnectorVersionChangedError => e
Utility::Logger.info("Could not start the job because #{connector_settings.formatted} has been updated externally. Message: #{e.message}")
rescue StandardError => e
Utility::ExceptionTracking.log_exception(e, "Sync job for #{connector_settings.formatted} failed due to unexpected error.")
end
end
def ready_for_sync_connectors
@scheduler.connector_settings
.select(&:ready_for_sync?)
.inject({}) { |memo, cs| memo.merge(cs.id => cs) }
end
end
end
end