lib/core/jobs/consumer.rb (101 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. # # frozen_string_literal: true require 'utility/logger' require 'utility/constants' require 'core/connector_job' require 'core/sync_job_runner' require 'concurrent' module Core module Jobs class Consumer def initialize(scheduler:, max_ingestion_queue_size:, max_ingestion_queue_bytes:, poll_interval: 3, termination_timeout: 60, min_threads: 1, max_threads: 5, max_queue: 100, idle_time: 5) @scheduler = scheduler @poll_interval = poll_interval @termination_timeout = termination_timeout @min_threads = min_threads @max_threads = max_threads @max_queue = max_queue @idle_time = idle_time @max_ingestion_queue_size = max_ingestion_queue_size @max_ingestion_queue_bytes = max_ingestion_queue_bytes end def subscribe!(index_name:) Utility::Logger.info("Starting a new consumer for #{index_name} index") @index_name = index_name start_timer_task! start_thread_pool! end def running? pool&.running? && timer_task&.running? end def shutdown! Utility::Logger.info("Shutting down consumer for #{@index_name} index") timer_task.shutdown pool.shutdown pool.wait_for_termination(@termination_timeout) reset_pool! end private attr_reader :pool, :timer_task def start_timer_task! @timer_task = Concurrent::TimerTask.execute(execution_interval: @poll_interval, run_now: true) { execute } end def start_thread_pool! @pool = Concurrent::ThreadPoolExecutor.new( min_threads: @min_threads, max_threads: @max_threads, max_queue: @max_queue, fallback_policy: :abort, idletime: @idle_time ) end def reset_pool! @pool = nil end def execute Utility::Logger.debug('Getting registered connectors') connectors = ready_for_sync_connectors return unless connectors.any? Utility::Logger.debug("Number of available connectors: #{connectors.size}") # @TODO It is assumed that @index_name is used to retrive pending jobs. # This will be discussed after 8.6 release pending_jobs = Core::ConnectorJob.pending_jobs(connectors_ids: connectors.keys) Utility::Logger.info("Number of pending jobs: #{pending_jobs.size}") pending_jobs.each do |job| connector_settings = connectors[job.connector_id] execute_job(job, connector_settings) end rescue StandardError => e Utility::ExceptionTracking.log_exception(e, 'The consumer group failed') end def execute_job(job, connector_settings) pool.post do Utility::Logger.info("Connector #{connector_settings.formatted} picked up the job #{job.id}") Core::ElasticConnectorActions.ensure_content_index_exists(connector_settings.index_name) job_runner = Core::SyncJobRunner.new( connector_settings, job, @max_ingestion_queue_size, @max_ingestion_queue_bytes ) job_runner.execute rescue Core::JobAlreadyRunningError Utility::Logger.info("Sync job for #{connector_settings.formatted} is already running, skipping.") rescue Core::ConnectorVersionChangedError => e Utility::Logger.info("Could not start the job because #{connector_settings.formatted} has been updated externally. Message: #{e.message}") rescue StandardError => e Utility::ExceptionTracking.log_exception(e, "Sync job for #{connector_settings.formatted} failed due to unexpected error.") end end def ready_for_sync_connectors @scheduler.connector_settings .select(&:ready_for_sync?) .inject({}) { |memo, cs| memo.merge(cs.id => cs) } end end end end