spec/app/dispatcher_spec.rb (175 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'core'
require 'core/filtering/validation_job_runner'
require 'app/dispatcher'
describe App::Dispatcher do
let(:scheduler) { double }
let(:pool) { double }
let(:job_cleanup_timer) { double }
let(:sync_job_runner) { double }
let(:filter_validation_job_runner) { double }
let(:connector_id) { 123 }
let(:info_message) { nil }
before(:each) do
allow(described_class).to receive(:scheduler).and_return(scheduler)
allow(described_class).to receive(:pool).and_return(pool)
allow(described_class).to receive(:job_cleanup_timer).and_return(job_cleanup_timer)
allow(Core::ElasticConnectorActions).to receive(:ensure_content_index_exists)
allow(pool).to receive(:post).and_yield
allow(job_cleanup_timer).to receive(:execute)
allow(scheduler).to receive(:when_triggered)
allow(Core::SyncJobRunner).to receive(:new).and_return(sync_job_runner)
allow(sync_job_runner).to receive(:execute)
allow(Core::Filtering::ValidationJobRunner).to receive(:new).and_return(filter_validation_job_runner)
allow(filter_validation_job_runner).to receive(:execute)
allow(Core::Heartbeat).to receive(:send)
allow(Utility::ExceptionTracking).to receive(:log_exception)
allow(Utility::Logger).to receive(:info)
allow(Core::ElasticConnectorActions).to receive(:update_connector_sync_now).and_return(true)
allow(Core::Jobs::Producer).to receive(:enqueue_job).and_return(true)
stub_const('App::Dispatcher::POLL_INTERVAL', 1)
stub_const('App::Dispatcher::TERMINATION_TIMEOUT', 1)
stub_const('App::Dispatcher::HEARTBEAT_INTERVAL', 60 * 30)
stub_const('App::Dispatcher::MIN_THREADS', 0)
stub_const('App::Dispatcher::MAX_THREADS', 5)
stub_const('App::Dispatcher::MAX_QUEUE', 100)
end
after(:each) do
described_class.instance_variable_set(:@running, Concurrent::AtomicBoolean.new(false))
end
describe '.start!' do
context 'when it\'s called twice' do
before(:each) do
allow(described_class).to receive(:start_polling_jobs!)
described_class.start!
end
it 'raises error' do
expect { described_class.start! }.to raise_error
end
end
it 'starts the job clean up task' do
expect(job_cleanup_timer).to receive(:execute)
described_class.start!
end
context 'without native connectors' do
it 'starts no sync jobs' do
expect(described_class).to_not receive(:start_sync_task)
expect(described_class).to_not receive(:start_heartbeat_task)
expect(described_class).to_not receive(:start_configuration_task)
described_class.start!
end
end
context 'with native connectors' do
let(:connector_settings) { double }
before(:each) do
allow(scheduler).to receive(:when_triggered).and_yield(connector_settings, task)
allow(connector_settings).to receive(:formatted).and_return('')
end
shared_examples_for 'logs exception' do
it 'logs exception' do
expect(Utility::ExceptionTracking).to receive(:log_exception)
expect { described_class.start! }.to_not raise_error
end
end
shared_examples_for('logs info') do
it 'does log info' do
expect { described_class.start! }.to_not raise_error
expect(Utility::Logger).to have_received(:info).with(Regexp.new(info_message, Regexp::IGNORECASE))
end
end
context 'with invalid task' do
let(:task) { :invalid }
it 'logs error' do
expect(Utility::Logger).to receive(:error)
expect { described_class.start! }.to_not raise_error
end
end
context 'with sync task' do
let(:task) { :sync }
before(:each) do
allow(connector_settings).to receive(:service_type).and_return('')
allow(connector_settings).to receive(:index_name).and_return('')
allow(connector_settings).to receive(:id).and_return('connector_id')
end
shared_examples_for 'sync' do
it 'starts sync job' do
# creates a new job document
expect(Core::ElasticConnectorActions).to receive(:update_connector_sync_now)
expect(Core::Jobs::Producer).to receive(:enqueue_job)
expect { described_class.start! }.to_not raise_error
end
end
it_behaves_like 'sync'
end
context 'with heartbeat task' do
let(:task) { :heartbeat }
it 'should send heartbeat' do
expect(Core::Heartbeat).to receive(:send)
expect { described_class.start! }.to_not raise_error
end
context 'when heartbeat throws an error' do
before(:each) do
allow(Core::Heartbeat).to receive(:send).and_raise('Oh no!')
end
it_behaves_like 'logs exception'
end
end
context 'with configuration task' do
let(:task) { :configuration }
let(:native_mode) { true }
let(:needs_service_type) { false }
let(:service_type) { 'custom' }
before(:each) do
allow(connector_settings).to receive(:needs_service_type?).and_return(needs_service_type)
allow(App::Config).to receive(:native_mode).and_return(native_mode)
allow(App::Config).to receive(:service_type).and_return(service_type)
end
it 'should update configuration without service type' do
expect(Core::Configuration).to receive(:update).with(connector_settings, nil)
expect { described_class.start! }.to_not raise_error
end
context 'in non-native mode' do
let(:native_mode) { false }
let(:needs_service_type) { true }
it 'should update configuration with service type' do
expect(Core::Configuration).to receive(:update).with(connector_settings, service_type)
expect { described_class.start! }.to_not raise_error
end
end
context 'when configuration throws an error' do
before(:each) do
allow(Core::Configuration).to receive(:update).and_raise('Oh no!')
end
it_behaves_like 'logs exception'
end
end
context 'with filter validation task' do
let(:task) { :filter_validation }
before(:each) do
allow(filter_validation_job_runner).to receive(:execute)
end
it 'should run the filter validation task' do
expect(described_class).to receive(:start_filter_validation_task)
expect { described_class.start! }.to_not raise_error
end
context 'when filter validation throws an error' do
before(:each) do
allow(filter_validation_job_runner).to receive(:execute).and_raise('Oh no!')
end
it_behaves_like 'logs exception'
end
end
end
end
describe '.shutdown!' do
before(:each) do
allow(described_class).to receive(:start_polling_jobs!)
described_class.start!
end
it 'shuts down correctly' do
expect(scheduler).to receive(:shutdown)
expect(pool).to receive(:shutdown)
expect(pool).to receive(:wait_for_termination)
expect(job_cleanup_timer).to receive(:shutdown)
described_class.shutdown!
end
end
end