in lib/core/sync_job_runner.rb [76:180]
def do_sync!
return unless claim_job!
begin
if @connector_settings.any_filtering_feature_enabled?
Utility::Logger.info("Checking active filtering for sync job #{@job_id} for connector #{@connector_id}.")
validate_filtering(@job.filtering)
Utility::Logger.debug("Active filtering for sync job #{@job_id} for connector #{@connector_id} is valid.")
end
@connector_instance = Connectors::REGISTRY.connector(@service_type, @connector_settings.configuration, job_description: @job)
@connector_instance.do_health_check!
@sync_status = nil
@sync_error = nil
@reporting_cycle_start = Time.now
incoming_ids = []
existing_ids = ElasticConnectorActions.fetch_document_ids(@index_name)
Utility::Logger.debug("#{existing_ids.size} documents are present in index #{@index_name}.")
post_processing_engine = @connector_settings.filtering_rule_feature_enabled? ? Core::Filtering::PostProcessEngine.new(@job.filtering) : nil
yield_docs do |document|
next if post_processing_engine && !post_processing_engine.process(document).is_include?
@sink.ingest(document)
incoming_ids << document['id']
end
ids_to_delete = existing_ids - incoming_ids.uniq
Utility::Logger.info("Deleting #{ids_to_delete.size} documents from index #{@index_name}.")
ids_to_delete.each do |id|
@sink.delete(id)
periodically do
check_job
@job.update_metadata(@sink.ingestion_stats, @connector_instance.metadata)
end
end
@sink.flush
check_job
@sync_status = Connectors::SyncStatus::COMPLETED
@sync_error = nil
rescue ConnectorNotFoundError, ConnectorJobNotFoundError, ConnectorJobNotRunningError => e
Utility::Logger.error(e.message)
@sync_status = Connectors::SyncStatus::ERROR
@sync_error = e.message
rescue ConnectorJobCanceledError => e
Utility::Logger.error(e.message)
@sync_status = Connectors::SyncStatus::CANCELED
@sync_error = nil
rescue StandardError => e
@sync_status = Connectors::SyncStatus::ERROR
@sync_error = e.message
Utility::ExceptionTracking.log_exception(e)
ensure
stats = @sink.ingestion_stats
Utility::Logger.debug("Sync stats are: #{stats}")
Utility::Logger.info("Upserted #{stats[:indexed_document_count]} documents into #{@index_name}.")
Utility::Logger.info("Deleted #{stats[:deleted_document_count]} documents into #{@index_name}.")
@sync_status ||= Connectors::SyncStatus::ERROR
@sync_error = 'Sync thread didn\'t finish execution. Check connector logs for more details.' if @sync_status == Connectors::SyncStatus::ERROR && @sync_error.nil?
if reload_job!
case @sync_status
when Connectors::SyncStatus::COMPLETED
@job.done!(stats, @connector_instance&.metadata)
when Connectors::SyncStatus::CANCELED
@job.cancel!(stats, @connector_instance&.metadata)
when Connectors::SyncStatus::ERROR
@job.error!(@sync_error, stats, @connector_instance&.metadata)
else
Utility::Logger.error("The job is supposed to be in one of the terminal statuses (#{Connectors::SyncStatus::TERMINAL_STATUSES.join(', ')}), but it's #{@sync_status}")
@sync_status = Connectors::SyncStatus::ERROR
@sync_error = 'The job is not ended as expected for unknown reason'
@job.error!(@sync_error, stats, @connector_instance&.metadata)
end
reload_job!
end
if reload_connector!
@connector_settings.update_last_sync!(@job)
end
Utility::Logger.info("Completed the job (ID: #{@job_id}) with status: #{@sync_status}#{@sync_error ? " and error:
end
end