spec/core/sync_job_runner_spec.rb (422 lines of code) (raw):

require 'connectors/connector_status' require 'connectors/sync_status' require 'core' require 'utility' describe Core::SyncJobRunner do let(:connector_id) { '123' } let(:service_type) { 'foo' } let(:request_pipeline) { Core::ConnectorSettings::DEFAULT_REQUEST_PIPELINE } let(:connector_status) { Connectors::ConnectorStatus::CONNECTED } let(:connector_stored_configuration) do # returned from Elasticsearch with values already specified by user { :lala => { :label => 'Lala', :value => 'hello' } } end let(:connector_default_configuration) do # returned from Connector class with default values { :lala => { :label => 'Lala', :value => nil } } end let(:connector_settings) { double } let(:filtering) do [ { 'domain' => Core::Filtering::DEFAULT_DOMAIN, 'rules' => [ Core::Filtering::SimpleRule::DEFAULT_RULE.to_h ], 'advanced_snippet' => { 'value' => {} } } ] end let(:job) { double } let(:connector_class) { double } let(:connector_instance) { double } let(:sink) { double } let(:output_index_name) { 'test-ingest-index' } let(:existing_document_ids) { [] } # ids of documents that are already in the index let(:extracted_documents) { [] } # documents returned from 3rd-party system let(:connector_metadata) { { :foo => 'bar' } } # metadata returned from connectors let(:any_filtering_feature_enabled) { true } let(:filtering_rule_feature_enabled) { true } let(:filtering_validation_result) { { :state => Core::Filtering::ValidationStatus::VALID, :errors => [] } } let(:connector_running) { false } let(:job_id) { 'job-123' } let(:job_canceling) { false } let(:job_in_progress) { true } let(:error_message) { nil } let(:extract_binary_content) { true } let(:reduce_whitespace) { true } let(:run_ml_inference) { true } let(:total_document_count) { 100 } let(:ingestion_stats) do { :indexed_document_count => 12, :deleted_document_count => 234, :indexed_document_volume => 0 } end let(:max_ingestion_queue_size) { 123 } let(:max_ingestion_queue_bytes) { 123456789 } subject { described_class.new(connector_settings, job, max_ingestion_queue_size, max_ingestion_queue_bytes) } before(:each) do allow(Core::ConnectorSettings).to receive(:fetch_by_id).with(connector_id).and_return(connector_settings) allow(Core::ConnectorJob).to receive(:fetch_by_id).with(job_id).and_return(job) allow(job).to receive(:id).and_return(job_id) allow(job).to receive(:make_running!) allow(job).to receive(:filtering).and_return(filtering) allow(job).to receive(:update_metadata) allow(job).to receive(:done!) allow(job).to receive(:cancel!) allow(job).to receive(:error!) allow(job).to receive(:canceling?).and_return(job_canceling) allow(job).to receive(:in_progress?).and_return(job_in_progress) allow(job).to receive(:index_name).and_return(output_index_name) allow(job).to receive(:service_type).and_return(service_type) allow(job).to receive(:extract_binary_content?).and_return(extract_binary_content) allow(job).to receive(:reduce_whitespace?).and_return(reduce_whitespace) allow(job).to receive(:run_ml_inference?).and_return(run_ml_inference) allow(job).to receive(:configuration).and_return(connector_stored_configuration) allow(Core::ElasticConnectorActions).to receive(:fetch_document_ids).and_return(existing_document_ids) allow(Core::ElasticConnectorActions).to receive(:update_connector_status) allow(Core::ElasticConnectorActions).to receive(:update_connector_sync_start) allow(Connectors::REGISTRY).to receive(:connector_class).and_return(connector_class) allow(Core::Ingestion::EsSink).to receive(:new).and_return(sink) allow(sink).to receive(:ingest) allow(sink).to receive(:delete) allow(sink).to receive(:flush) allow(sink).to receive(:ingestion_stats).and_return(ingestion_stats) allow(connector_settings).to receive(:id).and_return(connector_id) allow(connector_settings).to receive(:configuration).and_return(connector_stored_configuration) allow(connector_settings).to receive(:request_pipeline).and_return(request_pipeline) allow(connector_settings).to receive(:running?).and_return(connector_running) allow(connector_settings).to receive(:update_last_sync!) allow(connector_settings).to receive(:any_filtering_feature_enabled?).and_return(any_filtering_feature_enabled) allow(connector_settings).to receive(:filtering_rule_feature_enabled?).and_return(filtering_rule_feature_enabled) allow(connector_class).to receive(:configurable_fields).and_return(connector_default_configuration) allow(connector_class).to receive(:validate_filtering).and_return(filtering_validation_result) allow(connector_class).to receive(:new).and_return(connector_instance) allow(connector_instance).to receive(:metadata).and_return(connector_metadata) allow(connector_instance).to receive(:do_health_check!) allow_statement = allow(connector_instance).to receive(:yield_documents) extracted_documents.each { |document| allow_statement.and_yield(document) } # set to a large number to skip job check stub_const("#{described_class}::JOB_REPORTING_INTERVAL", 10000) end describe '#new' do let(:bulk_queue) { double } before(:each) do allow(Utility::BulkQueue).to receive(:new).and_return(bulk_queue) end it 'passes max_ingestion_queue_size and max_ingestion_queue_bytes to ingestion classes' do expect(Utility::BulkQueue).to receive(:new) .with(max_ingestion_queue_size, max_ingestion_queue_bytes) expect(Core::Ingestion::EsSink).to receive(:new) .with(anything, anything, bulk_queue, max_ingestion_queue_bytes) described_class.new(connector_settings, job, max_ingestion_queue_size, max_ingestion_queue_bytes) end end describe '#execute' do shared_examples_for 'claims the job' do it '' do expect(Core::ElasticConnectorActions).to receive(:update_connector_sync_start) expect(job).to receive(:make_running!) subject.execute end end shared_examples_for 'does not run a sync' do it '' do expect(job).to_not receive(:done!) expect(job).to_not receive(:cancel!) expect(job).to_not receive(:error!) expect(connector_settings).to_not receive(:update_last_sync!).with(job) subject.execute end end shared_examples_for 'sync stops with error' do it 'stops with error' do expect(job).to receive(:error!).with(error_message, ingestion_stats, connector_metadata) expect(connector_settings).to receive(:update_last_sync!).with(job) subject.execute end end shared_examples_for 'runs a full sync' do it 'finishes a sync job' do expect(job).to receive(:done!).with(ingestion_stats, connector_metadata) expect(connector_settings).to receive(:update_last_sync!).with(job) subject.execute end end context 'when connector was already configured with different configurable field set' do let(:connector_stored_configuration) do { :foo => { :label => 'Foo', :value => nil } } end let(:connector_default_configuration) do { :lala => { :label => 'Lala', :value => 'hello' } } end it 'raises an error' do expect { subject.execute }.to raise_error(Core::IncompatibleConfigurableFieldsError) end end context 'when connector is running' do let(:connector_running) { true } it_behaves_like 'does not run a sync' end context 'when failing to make connector running' do before(:each) do allow(Core::ElasticConnectorActions).to receive(:update_connector_sync_start).and_raise(StandardError) end it_behaves_like 'does not run a sync' end context 'when failing to make job running' do before(:each) do allow(job).to receive(:make_running!).and_raise(StandardError) end it_behaves_like 'does not run a sync' end context 'when filtering is in state invalid' do let(:error_message) { "Active filtering is not in valid state (current state: #{filtering_validation_result[:state]}) for connector #{connector_id}. Please check active filtering in connectors index." } let(:connector_metadata) { nil } let(:filtering_validation_result) { { :state => Core::Filtering::ValidationStatus::INVALID, :errors => [] } } it_behaves_like 'sync stops with error' end context 'when filtering is in state edited' do let(:error_message) { "Active filtering is not in valid state (current state: #{filtering_validation_result[:state]}) for connector #{connector_id}. Please check active filtering in connectors index." } let(:connector_metadata) { nil } let(:filtering_validation_result) { { :state => Core::Filtering::ValidationStatus::EDITED, :errors => [] } } it_behaves_like 'sync stops with error' end context 'when filtering is in state valid, but errors are present' do let(:error_message) { "Active filtering is in valid state, but errors were detected (errors: #{filtering_validation_result[:errors]}) for connector #{connector_id}. Please check active filtering in connectors index." } let(:connector_metadata) { nil } let(:filtering_validation_result) { { :state => Core::Filtering::ValidationStatus::VALID, :errors => ['Error'] } } it_behaves_like 'sync stops with error' end it_behaves_like 'claims the job' it 'flushes the sink' do # We don't ingest anything, but flush still happens just in case. # This is done so that the last batch of documents is always ingested into the sink expect(sink).to receive(:flush) subject.execute end it_behaves_like 'runs a full sync' context 'when an error occurs' do let(:error_message) { 'error message' } before(:each) do allow(connector_instance).to receive(:do_health_check!).and_raise(StandardError.new(error_message)) end it_behaves_like 'sync stops with error' end context 'when validation thread did not finish execution' do let(:error_message) { 'Sync thread didn\'t finish execution. Check connector logs for more details.' } before(:each) do # Exception, which is not rescued (treated like something, which stopped the sync thread) allow(connector_instance).to receive(:do_health_check!).and_raise(Exception.new('Oh no!')) end it 'sets an error, that the validation thread was killed' do # Check for exception thrown on purpose, so that the test is not marked as failed for the wrong reason expect { subject.execute }.to raise_exception expect(subject.instance_variable_get(:@sync_status)).to eq(Connectors::SyncStatus::ERROR) expect(subject.instance_variable_get(:@sync_error)).to eq('Sync thread didn\'t finish execution. Check connector logs for more details.') end end context 'when a bunch of documents are returned from 3rd-party system' do let(:doc1) do { :id => 1, :title => 'Hello', :body => 'World' } end let(:doc2) do { :id => 2, :title => 'thanks', :body => 'for the fish' } end let(:extracted_documents) { [doc1, doc2] } # documents returned from 3rd-party system shared_examples_for 'indexes all docs' do it '' do expect(sink).to receive(:ingest).with(doc1) expect(sink).to receive(:ingest).with(doc2) subject.execute end end shared_examples_for 'validates filtering' do it '' do expect(connector_class).to receive(:validate_filtering) subject.execute end end shared_context 'exclude one document' do let(:additional_rules) do [ Core::Filtering::SimpleRule.from_args('1', 'exclude', 'title', 'equals', 'Hello').to_h ] end before(:each) do filtering[0]['rules'].unshift(*additional_rules) end end it_behaves_like 'validates filtering' it_behaves_like 'indexes all docs' it_behaves_like 'runs a full sync' context 'when filtering rule feature is enabled' do include_context 'exclude one document' let(:any_filtering_feature_enabled) { true } let(:filtering_rule_feature_enabled) { true } it 'does not ingest the excluded document' do expect(sink).to_not receive(:ingest).with(doc1) expect(sink).to receive(:ingest).with(doc2) subject.execute end context 'with non-matching rule' do let(:additional_rules) do [ Core::Filtering::SimpleRule.from_args('1', 'exclude', 'foo', 'equals', 'Hello').to_h ] end it_behaves_like 'validates filtering' it_behaves_like 'indexes all docs' it_behaves_like 'runs a full sync' end end context 'when no filtering feature is enabled' do # will be ignored as no filtering feature is enabled include_context 'exclude one document' let(:any_filtering_feature_enabled) { false } let(:filtering_rule_feature_enabled) { false } # filtering won't be validated if all filtering features are disabled it 'does not validate filtering' do expect(connector_class).to_not receive(:validate_filtering) end it_behaves_like 'indexes all docs' it_behaves_like 'runs a full sync' end context 'when another filtering feature is enabled while filtering basic rule feature is disabled' do # will be ignored as basic rule filtering is disabled include_context 'exclude one document' let(:any_filtering_feature_enabled) { true } let(:filtering_rule_feature_enabled) { false } it_behaves_like 'validates filtering' it_behaves_like 'indexes all docs' it_behaves_like 'runs a full sync' end context 'when some documents were present before' do let(:existing_document_ids) { [3, 4, 'lala', 'some other id'] } it 'attempts to remove existing documents' do existing_document_ids.each do |id| expect(sink).to receive(:delete).with(id) end subject.execute end it_behaves_like 'runs a full sync' context 'when an error happens during sync' do let(:error_message) { 'whoops' } before(:each) do allow(sink).to receive(:flush).and_raise('whoops') end it_behaves_like 'sync stops with error' end end context 'with reporting' do before(:each) do # it will check job and report metadata for every document stub_const("#{described_class}::JOB_REPORTING_INTERVAL", 0) end it 'reports metadata' do expect(job).to receive(:update_metadata).with(ingestion_stats, connector_metadata) subject.execute end context 'when connector is deleted' do before(:each) do allow(Core::ConnectorSettings).to receive(:fetch_by_id).and_return(nil) end it 'marks the job as error' do expect(job).to receive(:error!).with(Core::ConnectorNotFoundError.new(connector_id).message, ingestion_stats, connector_metadata) subject.execute end end context 'when job is deleted' do before(:each) do allow(Core::ConnectorJob).to receive(:fetch_by_id).and_return(nil) end it 'updates connector' do expect(connector_settings).to receive(:update_last_sync!) subject.execute end end context 'when job is canceled' do let(:job_canceling) { true } it 'cancels the job' do expect(job).to receive(:cancel!).with(ingestion_stats, connector_metadata) expect(connector_settings).to receive(:update_last_sync!) subject.execute end end context 'when job is not in_progress' do let(:job_in_progress) { false } let(:job_status) { Connectors::SyncStatus::COMPLETED } before(:each) do allow(job).to receive(:status).and_return(job_status) end it 'marks the job error' do expect(job).to receive(:error!).with(Core::ConnectorJobNotRunningError.new(job_id, job_status).message, ingestion_stats, connector_metadata) expect(connector_settings).to receive(:update_last_sync!) subject.execute end end end end end context 'ingest metadata' do let(:document) { { 'body' => 'hello, world' } } it 'augments document data' do subject.send(:add_ingest_metadata, document) expect(document['_extract_binary_content']).to be expect(document['_reduce_whitespace']).to be expect(document['_run_ml_inference']).to be end context 'when the settings are false' do let(:extract_binary_content) { false } let(:reduce_whitespace) { false } let(:run_ml_inference) { false } it 'does not augment data' do subject.send(:add_ingest_metadata, document) expect(document['_extract_binary_content']).to_not be expect(document['_reduce_whitespace']).to_not be expect(document['_run_ml_inference']).to_not be end end end end