spec/core/jobs/consumer_spec.rb (133 lines of code) (raw):

require 'core/jobs/consumer' describe Core::Jobs::Consumer do let(:scheduler) { double } let(:default_consumer_options) do { max_ingestion_queue_size: 100, max_ingestion_queue_bytes: 1000, scheduler: scheduler } end let(:index_name) { 'test_index' } describe '#initialize' do subject { described_class.new(default_consumer_options) } it 'creates a Consumer instance' do is_expected.to be_kind_of(Core::Jobs::Consumer) end it 'does not start consumer' do is_expected.not_to be_running end end describe '#subscribe' do subject { described_class.new(default_consumer_options) } let(:timer_task) { double } before(:example) do allow(Concurrent::TimerTask).to receive(:execute).and_return(timer_task) allow(timer_task).to receive(:running?).and_return(true) allow(Concurrent::ThreadPoolExecutor).to receive(:new).and_return(double) end it 'starts a concurrent timer task' do subject.subscribe!(index_name: index_name) expect(Concurrent::TimerTask).to have_received(:execute) end it 'starts a ThreadPoolExecutor pool' do subject.subscribe!(index_name: index_name) expect(Concurrent::ThreadPoolExecutor).to have_received(:new) end end describe '#shutdown' do subject { described_class.new(default_consumer_options) } let(:timer_task) { double } let(:pool) { double } before(:example) do allow(Concurrent::TimerTask).to receive(:execute).and_return(timer_task) allow(timer_task).to receive(:shutdown).and_return(true) allow(Concurrent::ThreadPoolExecutor).to receive(:new).and_return(pool) allow(pool).to receive(:shutdown).and_return(true) allow(pool).to receive(:wait_for_termination).and_return(true) subject.subscribe!(index_name: index_name) end it 'shutdowns the timer task' do subject.shutdown! expect(timer_task).to have_received(:shutdown) end it 'shutdowns the thread pool' do subject.shutdown! expect(pool).to have_received(:shutdown) expect(pool).to have_received(:wait_for_termination) end end class FakeTimerTask def self.assign_proc(execute_proc) @execute_proc = execute_proc end def self.execute_proc @execute_proc.call end end describe 'execute' do let(:pool) { double } before(:example) do allow(Concurrent::TimerTask).to receive(:execute) do |_args, &block| FakeTimerTask.assign_proc(block) FakeTimerTask end allow(pool).to receive(:post) allow(Concurrent::ThreadPoolExecutor).to receive(:new).and_return(pool) end context 'when there is no ready_for_sync connectors' do it 'does not post a job to the thread pool' do allow(scheduler).to receive(:connector_settings).and_return([]) consumer = described_class.new(default_consumer_options) consumer.subscribe!(index_name: index_name) FakeTimerTask.execute_proc expect(pool).not_to have_received(:post) end end context 'when there are ready_for_sync connectors' do let(:connector_settings) { double } let(:connector_id) { '123' } let(:connector_index_name) { 'search-123' } let(:job) { double } let(:job_id) { '1234' } let(:pending_jobs) { [] } let(:consumer) { described_class.new(default_consumer_options) } before(:example) do allow(connector_settings).to receive(:id).and_return(connector_id) allow(connector_settings).to receive(:ready_for_sync?).and_return(true) allow(connector_settings).to receive(:formatted).and_return(connector_id.to_s) allow(connector_settings).to receive(:index_name).and_return(connector_index_name) allow(scheduler).to receive(:connector_settings).and_return([connector_settings]) allow(job).to receive(:connector_id).and_return(connector_id) allow(job).to receive(:id).and_return(job_id) allow(Core::ConnectorJob).to receive(:pending_jobs).and_return(pending_jobs) consumer.subscribe!(index_name: index_name) end context 'when there are pending jobs' do let(:pending_jobs) { [job] } let(:sync_job_runner) { double } it 'posts a job to the thread pool' do FakeTimerTask.execute_proc expect(pool).to have_received(:post) end it 'executes SyncRunner' do allow_any_instance_of(Core::SyncJobRunner).to receive(:execute) allow(Core::ElasticConnectorActions).to receive(:ensure_content_index_exists) allow(Core::SyncJobRunner).to receive(:new).and_return(sync_job_runner) allow(sync_job_runner).to receive(:execute) allow(pool).to receive(:post) do |&block| block.call end FakeTimerTask.execute_proc expect(sync_job_runner).to have_received(:execute) end end context 'when there is no pedning jobs' do it 'does not post a job to the thread pool' do FakeTimerTask.execute_proc expect(pool).not_to have_received(:post) end end end end end