spec/aws/active_job/sqs/executor_spec.rb (114 lines of code) (raw):

# frozen_string_literal: true module Aws module ActiveJob module SQS describe Executor do let(:logger) { double(info: nil, debug: nil) } before do allow(ActiveSupport::Logger).to receive(:new).and_return(logger) end it 'merges runtime options with defaults' do expected = Executor::DEFAULTS.merge(max_queue: 10) expect(Concurrent::ThreadPoolExecutor).to receive(:new).with(expected) Executor.new(max_queue: 10) end describe '#execute' do let(:body) { ActiveSupport::JSON.dump(TestJob.new('a1', 'a2').serialize) } # message is a reserved minitest name let(:msg) { double(data: double(body: body)) } let(:executor) { Executor.new } let(:runner) { double('runner', id: 'jobid', class_name: 'jobclass', exception_executions?: false) } it 'executes the job and deletes the message' do expect(JobRunner).to receive(:new).and_return(runner) expect(runner).to receive(:run) expect(msg).to receive(:delete) executor.execute(msg) executor.shutdown # give the job a chance to run end it 'raises the error and terminates poller' do expect(JobRunner).to receive(:new).and_return(runner) expect(runner).to receive(:run).and_raise StandardError expect do executor.execute(msg) executor.shutdown # give the job a chance to run end.to raise_exception(StandardError) end describe 'error_handler' do let(:error_handler) { double } let(:executor) { Executor.new(error_handler: error_handler) } let(:exception) { StandardError.new } it 'calls the error handler with exception and message' do expect(JobRunner).to receive(:new).and_return(runner) expect(runner).to receive(:run).and_raise exception expect(error_handler).to receive(:call).with(exception, msg) expect(executor).to receive(:shutdown).exactly(1).times.and_call_original executor.execute(msg) executor.shutdown # give the job a chance to run end end describe 'backpressure' do let(:executor) { Executor.new(max_threads: 1, max_queue: 1) } let(:trigger) { Concurrent::Event.new } it 'waits for a tasks to complete before attempting to post new tasks' do task_complete_event = executor.instance_variable_get(:@task_complete) expect(JobRunner).to receive(:new).at_least(:once).and_return(runner) allow(msg).to receive(:delete) allow(runner).to receive(:run) do trigger.wait end expect(task_complete_event).to receive(:wait).at_least(:once) do trigger.set # unblock the task end executor.execute(msg) # first message runs executor.execute(msg) # second message enters queue executor.execute(msg) # third message triggers wait end end end describe '#shutdown' do let(:tp) { double } it 'calls shutdown and waits for termination' do expect(Concurrent::ThreadPoolExecutor).to receive(:new).and_return(tp) executor = Executor.new expect(tp).to receive(:shutdown) expect(tp).to receive(:wait_for_termination).with(5).and_return true executor.shutdown(5) end context 'errors during shutdown' do let(:error_handler) { double } let(:body) { ActiveSupport::JSON.dump(TestJob.new('a1', 'a2').serialize) } let(:msg) { double(data: double(body: body)) } let(:executor) { Executor.new(error_handler: error_handler) } let(:runner) { double('runner', id: 'jobid', class_name: 'jobclass', exception_executions?: false) } it 'handles errors from jobs during shutdown' do expect(JobRunner).to receive(:new).and_return(runner) expect(runner).to receive(:run) do sleep(0.1) raise StandardError end expect(error_handler).to receive(:call) expect(executor).to receive(:shutdown).exactly(1).times.and_call_original executor.execute(msg) executor.shutdown end end context 'lifecycle hooks are registered' do let(:hook) { double } before do allow(hook).to receive(:call) end after do Executor.clear_hooks end it 'executes hook when shutdown' do Aws::ActiveJob::SQS.on_worker_stop do hook.call end executor = Executor.new executor.shutdown expect(hook).to have_received(:call) end end end end end end end