spec/active_job/queue_adapters/sqs_adapter_spec.rb (198 lines of code) (raw):

# frozen_string_literal: true module ActiveJob module QueueAdapters describe SqsAdapter do let(:client) { double('Client') } before do allow(Aws::ActiveJob::SQS.config).to receive(:client).and_return(client) end it 'enqueues jobs' do expect(client).to receive(:send_message) .with( { queue_url: 'https://queue-url', message_body: instance_of(String), message_attributes: instance_of(Hash) } ) TestJob.perform_later('test') sleep(0.2) end context 'fifo queues' do before do allow(Aws::ActiveJob::SQS.config).to receive(:url_for).and_return('https://queue-url.fifo') end it 'adds message_deduplication_id and default message_group_id if job does not override it' do expect(client).to receive(:send_message) .with( { queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), message_group_id: Aws::ActiveJob::SQS.config.message_group_id_for(:default), message_deduplication_id: instance_of(String) } ) TestJob.perform_later('test') sleep(0.2) end context 'when job has excluded deduplication keys defined' do let(:ex_dedup_keys) { %w[job_class queue_name] } let(:ex_dudup_keys_with_job_id) { ex_dedup_keys << 'job_id' } let(:hashed_body) { 'hashed_body' } context 'through #deduplicate_without' do before do TestJobWithDedupKeys.deduplicate_without(*ex_dedup_keys) end it 'adds customized message_deduplication_id' do expect(Digest::SHA256).to receive(:hexdigest) do |body| ex_dudup_keys_with_job_id.each do |key| expect(body).not_to include(%("#{key}")) end end.and_return(hashed_body) expect(client).to receive(:send_message).with( { queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), message_group_id: Aws::ActiveJob::SQS.config.message_group_id_for(:default), message_deduplication_id: hashed_body } ) TestJobWithDedupKeys.perform_later('test') sleep(0.2) end end context 'through Aws::ActiveJob::SQS config' do before do Aws::ActiveJob::SQS.configure do |config| config.excluded_deduplication_keys = ex_dedup_keys end end it 'adds customized message_deduplication_id' do expect(Digest::SHA256).to receive(:hexdigest) do |body| ex_dudup_keys_with_job_id.each do |key| expect(body).not_to include(%("#{key}")) end end.and_return(hashed_body) expect(client).to receive(:send_message).with( { queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), message_group_id: Aws::ActiveJob::SQS.config.message_group_id_for(:default), message_deduplication_id: hashed_body } ) TestJob.perform_later('test') sleep(0.2) end end end context 'when job has #message_group_id defined' do it 'adds message_deduplication_id and default message_group_id if job does not return a value' do expect(client).to receive(:send_message).with( { queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), message_group_id: Aws::ActiveJob::SQS.config.message_group_id_for(:default), message_deduplication_id: instance_of(String) } ) TestJobWithMessageGroupID.perform_later('test') sleep(0.2) end it 'adds message_deduplication_id and given message_group_id if job returns a value' do arg = 'test' dbl = TestJobWithMessageGroupID.new(arg) message_group_id = "mgi_#{rand(0..100)}" expect(client).to receive(:send_message).with( { queue_url: 'https://queue-url.fifo', message_body: instance_of(String), message_attributes: instance_of(Hash), message_group_id: message_group_id, message_deduplication_id: instance_of(String) } ) expect(TestJobWithMessageGroupID).to receive(:new).with(arg).and_return(dbl) expect(dbl).to receive(:message_group_id).and_return(message_group_id) TestJobWithMessageGroupID.perform_later(arg) sleep(0.2) end end end context 'with queue delay' do it 'enqueues jobs with proper delay' do t1 = Time.now allow(Time).to receive(:now).and_return t1 expect(client).to receive(:send_message).with( { queue_url: 'https://queue-url', delay_seconds: 60, message_body: instance_of(String), message_attributes: instance_of(Hash) } ) TestJob.set(wait: 1.minute).perform_later('test') sleep(0.2) end it 'enqueues jobs with zero or negative delay' do t1 = Time.now allow(Time).to receive(:now).and_return t1 expect(client).to receive(:send_message).with( { queue_url: 'https://queue-url', delay_seconds: 0, message_body: instance_of(String), message_attributes: instance_of(Hash) } ).twice TestJob.set(wait: 0).perform_later('test') TestJob.set(wait: -1).perform_later('test') sleep(0.2) end it 'raises an error when job delay is great than SQS support' do t1 = Time.now allow(Time).to receive(:now).and_return t1 expect do TestJob.set(wait: 1.day).perform_later('test') end.to raise_error ArgumentError end end context 'with multiple jobs' do before do response = double('Response') allow(response).to receive(:successful).and_return([1, 2]) allow(client).to receive(:send_message_batch).and_return(response) end it do expect(client).to receive(:send_message_batch).with( { queue_url: 'https://queue-url', entries: [ { delay_seconds: instance_of(Integer), id: instance_of(String), message_body: instance_of(String), message_attributes: instance_of(Hash) }, { delay_seconds: instance_of(Integer), id: instance_of(String), message_body: instance_of(String), message_attributes: instance_of(Hash) } ] } ).once jobs = [ TestJob.new('test').set(wait: 1.minute), TestJob.new('test').set(wait: 1.minute) ] ActiveJob.perform_all_later(jobs) end end end end end