gems/aws-sdk-sqs/spec/queue_poller_spec.rb (440 lines of code) (raw):

# frozen_string_literal: true require_relative 'spec_helper' module Aws module SQS describe QueuePoller do let(:queue_url) { 'https://sqs.us-east-1.amazonaws.com/12345/test' } let(:client) { Client.new(stub_responses: true) } let(:options) { { client: client } } let(:poller) { QueuePoller.new(queue_url, options) } def sample_message(n = nil) suffix = n ? "-#{n}" : '' { message_id: "id#{suffix}", receipt_handle: "rh#{suffix}", body: "body#{suffix}" } end describe 'configuration' do it 'raises an error on unknown configuration options' do expect do QueuePoller.new(queue_url, client: client, bad: 'option') end.to raise_error(ArgumentError, 'invalid option :bad') expect do QueuePoller.new(queue_url, client: client).poll(bad: 'option') { |m| } end.to raise_error(ArgumentError, 'invalid option :bad') end it 'is immutable' do expect(poller.default_config).to be_frozen expect(poller.default_config.request_params).to be_frozen end it 'has reasonable defaults' do expect(poller.queue_url).to eq(queue_url) expect(poller.default_config.idle_timeout).to be(nil) expect(poller.default_config.skip_delete).to be(false) expect(poller.default_config.before_request).to be(nil) expect(poller.default_config.after_empty_receive).to be(nil) expect(poller.default_config.request_params) .to eq( wait_time_seconds: 20, max_number_of_messages: 1, visibility_timeout: nil, attribute_names: ['All'], message_attribute_names: ['All'] ) end it 'accepts configuration options to the constructor' do client = double('client') callback = double('callback') poller = QueuePoller.new( queue_url, { client: client, idle_timeout: 60, skip_delete: true, before_request: callback, after_empty_receive: callback, wait_time_seconds: 10, max_number_of_messages: 10, visibility_timeout: 10, attribute_names: ['attr-name'], message_attribute_names: ['msg-attr-name'] } ) expect(poller.client).to be(client) expect(poller.default_config.idle_timeout).to eq(60) expect(poller.default_config.skip_delete).to be(true) expect(poller.default_config.before_request).to be(callback) expect(poller.default_config.after_empty_receive).to be(callback) expect(poller.default_config.request_params) .to eq( wait_time_seconds: 10, max_number_of_messages: 10, visibility_timeout: 10, attribute_names: ['attr-name'], message_attribute_names: ['msg-attr-name'] ) end context 'max_number_of_messages validation' do subject do QueuePoller.new( queue_url, client: client, max_number_of_messages: max_number_of_messages ) end let(:max_number_of_messages) { 1 } it 'accepts a positive integer' do expect(subject.default_config.request_params[:max_number_of_messages]).to eq(1) end [0, nil, 1.1, '1'].each do |value| context "with `max_number_of_messages: #{value.inspect}`" do let(:max_number_of_messages) { value } it 'raises an error' do expect { subject }.to raise_error(ArgumentError, /positive integer/) end end end end end describe '#poll' do it 'receives messages in a loop' do expect(client).to receive(:receive_message) .exactly(2).times .with( { queue_url: queue_url, wait_time_seconds: 20, max_number_of_messages: 1, visibility_timeout: nil, attribute_names: ['All'], message_attribute_names: ['All'] } ) .and_return(client.stub_data(:receive_message)) poller.before_request do |stats| throw :stop_polling if stats.request_count >= 2 end poller.poll { |msg| } end it 'yields received messages to the block' do client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] } ] ) yielded = nil poller.poll(idle_timeout: 0) do |msg| yielded = msg end expect(yielded.body).to eq('body') end it 'yields an array when max messages is greater than 1' do client.stub_responses( :receive_message, [ { messages: [ sample_message(1), sample_message(2) ] }, { messages: [] } ] ) yielded = nil poller.poll(idle_timeout: 0, max_number_of_messages: 2) do |messages| yielded = messages end expect(yielded.map(&:body)).to eq(%w[body-1 body-2]) end it 'does not have duplicated messages and given the '\ 'most recently received duplicated message' do message_one = sample_message(1) message_dup = message_one.dup.merge(receipt_handle: 'foo') client.stub_responses( :receive_message, [{ messages: [ message_one, message_dup ] }, { messages: [] }] ) yielded_arr = [] poller.poll(idle_timeout: 0) do |msg| yielded_arr << msg end expect(yielded_arr.count).to eq(1) expect(yielded_arr[0].receipt_handle).to eq(message_dup[:receipt_handle]) end describe 'message deletion' do it 'deletes the message at the end of the block' do expect(client).to receive(:delete_message_batch) .with( queue_url: queue_url, entries: [{ id: 'id', receipt_handle: 'rh' }] ) client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] } ] ) poller.poll(idle_timeout: 0) { |msg| } end it 'supports batch deletion' do expect(client).to receive(:delete_message_batch) .with( queue_url: queue_url, entries: [ { id: 'id-1', receipt_handle: 'rh-1' }, { id: 'id-2', receipt_handle: 'rh-2' } ] ) client.stub_responses( :receive_message, [ { messages: [ sample_message(1), sample_message(2) ] }, { messages: [] } ] ) poller.poll(idle_timeout: 0) { |msg| } end it 'can skip default delete behavior' do expect(client).not_to receive(:delete_message_batch) client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] } ] ) poller.poll(idle_timeout: 0, skip_delete: true) { |msg| } end it 'skips delete when :skip_delete is thrown' do expect(client).not_to receive(:delete_message_batch) client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] } ] ) poller.poll(idle_timeout: 0) { |_msg| throw :skip_delete } end it 'provides the ability to manually delete messages' do expect(client).to receive(:delete_message).with( queue_url: queue_url, receipt_handle: 'rh' ) client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] } ] ) poller.poll(idle_timeout: 0, skip_delete: true) do |msg| poller.delete_message(msg) end end it 'provides the ability to manually delete message batches' do expect(client).to receive(:delete_message_batch) .with( queue_url: queue_url, entries: [ { id: 'id-1', receipt_handle: 'rh-1' }, { id: 'id-2', receipt_handle: 'rh-2' } ] ) client.stub_responses( :receive_message, [ { messages: [ sample_message(1), sample_message(2) ] }, { messages: [] } ] ) poller.poll(idle_timeout: 0, max_number_of_messages: 10, skip_delete: true) do |messages| poller.delete_messages(messages) end end end describe 'after_empty_receive callback' do it 'calls the callback when an empty message set is received' do client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] } ] ) resp = nil callback = proc {|stats| } expect(callback).to receive(:call).once { throw :stop_polling } poller.after_empty_receive(&callback) poller.poll{ |msg| } end end describe 'visibility timeouts' do it 'provides a method to update the visibility timeout of a message' do client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] } ] ) resp = nil poller.poll(idle_timeout: 0) do |msg| resp = poller.change_message_visibility_timeout(msg, 60) end expect(resp.context.operation_name.to_s).to eq('change_message_visibility') expect(resp.context.params) .to eq( queue_url: queue_url, receipt_handle: 'rh', visibility_timeout: 60 ) end end describe 'stop polling' do it 'polls until :stop_polling is thrown from #before_request' do expect(client).to receive(:receive_message) .exactly(10).times .and_return(client.stub_data(:receive_message)) poller.before_request do |stats| throw :stop_polling if stats.request_count == 10 end poller.poll { |msg| } end it 'polls until :idle_timeout seconds have past without messages' do now = Time.now allow(Time).to receive(:now).and_return(now) one_minute_later = now + 61 expect(client).to receive(:receive_message) .exactly(10).times .and_return(client.stub_data(:receive_message)) poller.before_request do |stats| if stats.request_count == 9 allow(Time).to receive(:now) .and_return(one_minute_later) end end poller.poll(idle_timeout: 60) { |msg| } end end describe 'tracking stats' do it 'counts the number of requests made' do client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] }, { messages: [sample_message] }, { messages: [] }, { messages: [sample_message] } ] ) poller.before_request do |stats| throw :stop_polling if stats.received_message_count == 3 end stats = poller.poll { |msg| } expect(stats.request_count).to eq(5) end it 'counts the number of messages yielded in single mode' do client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [sample_message] }, { messages: [sample_message] }, { messages: [sample_message] }, { messages: [sample_message] }, { messages: [] } ] ) stats = poller.poll(idle_timeout: 0) { |msg| } expect(stats.received_message_count).to eq(5) end it 'counts the number of messages yielded in batch mode' do client.stub_responses( :receive_message, [ { messages: [ sample_message, sample_message, sample_message ] }, { messages: [ sample_message, sample_message, sample_message ] }, { messages: [] } ] ) stats = poller.poll(idle_timeout: 0, max_number_of_messages: 3) { |msgs| } expect(stats.received_message_count).to eq(6) end it 'tracks when a message was most recently received' do client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [] } ] ) stats = poller.poll(idle_timeout: 0) { |msg| } expect(stats.last_message_received_at).to be_kind_of(Time) end it 'has a nil value for last_message_received_at with no messages' do stats = poller.poll(idle_timeout: 0) { |msg| } expect(stats.last_message_received_at).to be(nil) end it 'tracks when polling started' do stats = poller.poll(idle_timeout: 0) { |msg| } expect(stats.polling_started_at).to be_kind_of(Time) end it 'tracks when polling stops' do started = Time.now poller.before_request do |stats| expect(stats.polling_stopped_at).to be(nil) end stats = poller.poll(idle_timeout: 0) { |msg| } expect(stats.polling_stopped_at > started).to be(true) end it 'yields a stats object to #poll' do client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [sample_message] }, { messages: [sample_message] }, { messages: [] } ] ) yielded = nil poller.poll(idle_timeout: 0) do |_msg, stats| yielded = stats end expect(yielded).to be_kind_of(QueuePoller::PollerStats) expect(yielded.request_count).to eq(4) expect(yielded.received_message_count).to eq(3) expect(yielded.last_message_received_at).to be_kind_of(Time) end it 'returns a stats object' do client.stub_responses( :receive_message, [ { messages: [sample_message] }, { messages: [sample_message] }, { messages: [sample_message] }, { messages: [] } ] ) stats = poller.poll(idle_timeout: 0) { |msg| } expect(stats).to be_kind_of(QueuePoller::PollerStats) expect(stats.request_count).to eq(4) expect(stats.received_message_count).to eq(3) expect(stats.last_message_received_at).to be_kind_of(Time) end end end end end end