lib/core/sync_job_runner.rb (200 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. # # frozen_string_literal: true require 'connectors/connector_status' require 'connectors/registry' require 'connectors/sync_status' require 'core/filtering/post_process_engine' require 'core/ingestion' require 'core/filtering/validation_status' require 'utility' module Core class IncompatibleConfigurableFieldsError < StandardError def initialize(service_type, expected_fields, actual_fields) super("Connector of service_type '#{service_type}' expected configurable fields: #{expected_fields}, actual stored fields: #{actual_fields}") end end class ConnectorNotFoundError < StandardError def initialize(connector_id) super("Connector is not found for connector ID '#{connector_id}'.") end end class ConnectorJobNotFoundError < StandardError def initialize(job_id) super("Connector job is not found for job ID '#{job_id}'.") end end class ConnectorJobCanceledError < StandardError def initialize(job_id) super("Connector job (ID: '#{job_id}') is requested to be canceled.") end end class ConnectorJobNotRunningError < StandardError def initialize(job_id, status) super("Connector job (ID: '#{job_id}') is not running but in status of '#{status}'.") end end class SyncJobRunner JOB_REPORTING_INTERVAL = 10 def initialize(connector_settings, job, max_ingestion_queue_size, max_ingestion_queue_bytes) @connector_settings = connector_settings @connector_id = connector_settings.id @index_name = job.index_name @service_type = job.service_type @job = job @job_id = job.id @sink = Core::Ingestion::EsSink.new( @index_name, @connector_settings.request_pipeline, Utility::BulkQueue.new( max_ingestion_queue_size, max_ingestion_queue_bytes ), max_ingestion_queue_bytes ) @connector_class = Connectors::REGISTRY.connector_class(@service_type) end def execute validate_configuration! do_sync! end private def do_sync! return unless claim_job! begin # We want to validate advanced filtering rules even if basic rules are disabled if @connector_settings.any_filtering_feature_enabled? Utility::Logger.info("Checking active filtering for sync job #{@job_id} for connector #{@connector_id}.") validate_filtering(@job.filtering) Utility::Logger.debug("Active filtering for sync job #{@job_id} for connector #{@connector_id} is valid.") end @connector_instance = Connectors::REGISTRY.connector(@service_type, @connector_settings.configuration, job_description: @job) @connector_instance.do_health_check! @sync_status = nil @sync_error = nil @reporting_cycle_start = Time.now incoming_ids = [] existing_ids = ElasticConnectorActions.fetch_document_ids(@index_name) Utility::Logger.debug("#{existing_ids.size} documents are present in index #{@index_name}.") post_processing_engine = @connector_settings.filtering_rule_feature_enabled? ? Core::Filtering::PostProcessEngine.new(@job.filtering) : nil yield_docs do |document| next if post_processing_engine && !post_processing_engine.process(document).is_include? @sink.ingest(document) incoming_ids << document['id'] end ids_to_delete = existing_ids - incoming_ids.uniq Utility::Logger.info("Deleting #{ids_to_delete.size} documents from index #{@index_name}.") ids_to_delete.each do |id| @sink.delete(id) periodically do check_job @job.update_metadata(@sink.ingestion_stats, @connector_instance.metadata) end end @sink.flush # force check at the end check_job # We use this mechanism for checking, whether an interrupt (or something else lead to the thread not finishing) # occurred as most of the time the main execution thread is interrupted and we miss this Signal/Exception here @sync_status = Connectors::SyncStatus::COMPLETED @sync_error = nil rescue ConnectorNotFoundError, ConnectorJobNotFoundError, ConnectorJobNotRunningError => e Utility::Logger.error(e.message) @sync_status = Connectors::SyncStatus::ERROR @sync_error = e.message rescue ConnectorJobCanceledError => e Utility::Logger.error(e.message) @sync_status = Connectors::SyncStatus::CANCELED # Cancelation is an expected action and we shouldn't log an error @sync_error = nil rescue StandardError => e @sync_status = Connectors::SyncStatus::ERROR @sync_error = e.message Utility::ExceptionTracking.log_exception(e) ensure stats = @sink.ingestion_stats Utility::Logger.debug("Sync stats are: #{stats}") Utility::Logger.info("Upserted #{stats[:indexed_document_count]} documents into #{@index_name}.") Utility::Logger.info("Deleted #{stats[:deleted_document_count]} documents into #{@index_name}.") # Make sure to not override a previous error message @sync_status ||= Connectors::SyncStatus::ERROR @sync_error = 'Sync thread didn\'t finish execution. Check connector logs for more details.' if @sync_status == Connectors::SyncStatus::ERROR && @sync_error.nil? # update job if it's still present if reload_job! case @sync_status when Connectors::SyncStatus::COMPLETED @job.done!(stats, @connector_instance&.metadata) when Connectors::SyncStatus::CANCELED @job.cancel!(stats, @connector_instance&.metadata) when Connectors::SyncStatus::ERROR @job.error!(@sync_error, stats, @connector_instance&.metadata) else Utility::Logger.error("The job is supposed to be in one of the terminal statuses (#{Connectors::SyncStatus::TERMINAL_STATUSES.join(', ')}), but it's #{@sync_status}") @sync_status = Connectors::SyncStatus::ERROR @sync_error = 'The job is not ended as expected for unknown reason' @job.error!(@sync_error, stats, @connector_instance&.metadata) end # need to reload the job to get the latest job status reload_job! end # update connector if it's still present if reload_connector! @connector_settings.update_last_sync!(@job) end Utility::Logger.info("Completed the job (ID: #{@job_id}) with status: #{@sync_status}#{@sync_error ? " and error: #{@sync_error}" : ''}") end end def yield_docs @connector_instance.yield_documents do |document| document = add_ingest_metadata(document) yield(document) if block_given? periodically do check_job @job.update_metadata(@sink.ingestion_stats, @connector_instance.metadata) end end end def claim_job! Utility::Logger.info("Claiming job (ID: #{@job_id}) for connector (ID: #{@connector_id}).") # connector service doesn't support multiple jobs running simultaneously if @connector_settings.running? Utility::Logger.warn("Failed to claim job (ID: #{@job_id}) for connector (ID: #{@connector_id}), there are already jobs running.") return false end begin Core::ElasticConnectorActions.update_connector_sync_start(@connector_id) @job.make_running! Utility::Logger.info("Successfully claimed job (ID: #{@job_id}) for connector (ID: #{@connector_id}).") true rescue StandardError => e Utility::ExceptionTracking.log_exception(e) Utility::Logger.error("Failed to claim job (ID: #{@job_id}) for connector (ID: #{@connector_id}). Please check the logs for the cause of this error.") false end end def add_ingest_metadata(document) return document unless @job document.tap do |it| it['_extract_binary_content'] = @job.extract_binary_content? if @job.extract_binary_content? it['_reduce_whitespace'] = @job.reduce_whitespace? if @job.reduce_whitespace? it['_run_ml_inference'] = @job.run_ml_inference? if @job.run_ml_inference? end end def validate_configuration! expected_fields = @connector_class.configurable_fields.keys.map(&:to_s).sort actual_fields = @job.configuration.keys.map(&:to_s).sort raise IncompatibleConfigurableFieldsError.new(@service_type, expected_fields, actual_fields) if expected_fields != actual_fields end def validate_filtering(filtering) validation_result = @connector_class.validate_filtering(filtering) wrong_state_error = Utility::InvalidFilterConfigError.new("Active filtering is not in valid state (current state: #{validation_result[:state]}) for connector #{@connector_id}. Please check active filtering in connectors index.") raise wrong_state_error if validation_result[:state] != Core::Filtering::ValidationStatus::VALID errors_present_error = Utility::InvalidFilterConfigError.new("Active filtering is in valid state, but errors were detected (errors: #{validation_result[:errors]}) for connector #{@connector_id}. Please check active filtering in connectors index.") raise errors_present_error if validation_result[:errors].present? end def periodically return if Time.now - @reporting_cycle_start < JOB_REPORTING_INTERVAL yield if block_given? @reporting_cycle_start = Time.now end def check_job # raise error if the connector is deleted raise ConnectorNotFoundError.new(@connector_id) unless reload_connector! # raise error if the job is deleted raise ConnectorJobNotFoundError.new(@job_id) unless reload_job! # raise error if the job is canceled raise ConnectorJobCanceledError.new(@job_id) if @job.canceling? # raise error if the job is not in the status in_progress raise ConnectorJobNotRunningError.new(@job_id, @job.status) unless @job.in_progress? end def reload_job! @job = ConnectorJob.fetch_by_id(@job_id) end def reload_connector! @connector_settings = ConnectorSettings.fetch_by_id(@connector_id) end end end